0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include <unistd.h>
0013 #include <signal.h>
0014 #include <stdio.h>
0015 #include <stdlib.h>
0016 #include <pthread.h>
0017 #include <errno.h>
0018 #include <sys/time.h>
0019 #include <ndebug.h>
0020 #include <time.h>
0021 #include <ndrstandard.h>
0022 #ifdef EX_OS_LINUX
0023 #include <sys/prctl.h>
0024 #endif
0025
0026 #include <exthpool.h>
0027
0028 #include <nstdutil.h>
0029 #include <ndrxdiag.h>
0030 #include <nstopwatch.h>
0031
0032 #ifdef THPOOL_DEBUG
0033 #define THPOOL_DEBUG 1
0034 #else
0035 #define THPOOL_DEBUG 0
0036 #endif
0037
0038 #if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
0039 #define err(str) NDRX_LOG(log_error, str)
0040 #else
0041 #define err(str)
0042 #endif
0043
0044
0045
0046
0047
0048 typedef struct bsem
0049 {
0050 pthread_mutex_t mutex;
0051 pthread_cond_t cond;
0052 int v;
0053 } bsem;
0054
0055
0056
0057 typedef struct job
0058 {
0059 struct job* prev;
0060 void (*function)(void* arg, int *p_finish_off);
0061 void* arg;
0062 } job;
0063
0064
0065
0066 typedef struct jobqueue
0067 {
0068 job *front;
0069 job *rear;
0070 bsem *has_jobs;
0071 int len;
0072 } jobqueue;
0073
0074
0075
0076 typedef struct poolthread
0077 {
0078 int id;
0079 pthread_t pthread;
0080 struct thpool_* thpool_p;
0081 } poolthread;
0082
0083
0084
0085 typedef struct thpool_
0086 {
0087 poolthread** threads;
0088 volatile int num_threads_alive;
0089 volatile int num_threads_working;
0090 pthread_mutex_t thcount_lock;
0091 pthread_cond_t threads_all_idle;
0092 pthread_cond_t threads_one_idle;
0093 pthread_cond_t proc_one;
0094 int threads_keepalive;
0095 int num_threads;
0096 int num_threads_allocd;
0097 int thread_status;
0098 jobqueue jobqueue;
0099 ndrx_thpool_tpsvrthrinit_t pf_init;
0100 ndrx_thpool_tpsvrthrdone_t pf_done;
0101 int argc;
0102 char **argv;
0103 } thpool_;
0104
0105
0106
0107
0108 static int poolthread_init(thpool_* thpool_p, struct poolthread** thread_p, int id);
0109 static void* poolthread_do(struct poolthread* thread_p);
0110 static void poolthread_hold(int sig_id);
0111 static void poolthread_destroy(struct poolthread* thread_p);
0112
0113 static int jobqueue_init(jobqueue* jobqueue_p);
0114 static void jobqueue_clear(jobqueue* jobqueue_p);
0115 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
0116 static struct job* jobqueue_pull(jobqueue* jobqueue_p);
0117 static void jobqueue_destroy(jobqueue* jobqueue_p);
0118
0119 static void bsem_init(struct bsem *bsem_p, int value);
0120 static void bsem_reset(struct bsem *bsem_p);
0121 static void bsem_post(struct bsem *bsem_p);
0122 static void bsem_post_all(struct bsem *bsem_p);
0123 static void bsem_wait(struct bsem *bsem_p);
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138 struct thpool_* ndrx_thpool_init(int num_threads, int *p_ret,
0139 ndrx_thpool_tpsvrthrinit_t pf_init, ndrx_thpool_tpsvrthrdone_t pf_done,
0140 int argc, char **argv)
0141 {
0142
0143 thpool_* thpool_p;
0144 if (num_threads < 0)
0145 {
0146 num_threads = 0;
0147 }
0148
0149
0150 thpool_p = (struct thpool_*)NDRX_FPMALLOC(sizeof(struct thpool_), 0);
0151 if (thpool_p == NULL)
0152 {
0153 err("thpool_init(): Could not allocate memory for thread pool\n");
0154 return NULL;
0155 }
0156 thpool_p->num_threads = 0;
0157 thpool_p->num_threads_allocd = 0;
0158 thpool_p->threads_keepalive = 1;
0159 thpool_p->num_threads_alive = 0;
0160 thpool_p->num_threads_working = 0;
0161 thpool_p->pf_init = pf_init;
0162 thpool_p->pf_done = pf_done;
0163 thpool_p->argc = argc;
0164 thpool_p->argv = argv;
0165
0166
0167 if (jobqueue_init(&thpool_p->jobqueue) == -1)
0168 {
0169 err("thpool_init(): Could not allocate memory for job queue\n");
0170 NDRX_FPFREE(thpool_p);
0171 return NULL;
0172 }
0173
0174
0175 thpool_p->threads = (struct poolthread**)NDRX_FPMALLOC(num_threads * sizeof(struct poolthread *), 0);
0176
0177 if (thpool_p->threads == NULL)
0178 {
0179 err("thpool_init(): Could not allocate memory for threads\n");
0180 jobqueue_destroy(&thpool_p->jobqueue);
0181 NDRX_FPFREE(thpool_p);
0182 return NULL;
0183 }
0184
0185 pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
0186 pthread_cond_init(&thpool_p->threads_all_idle, NULL);
0187 pthread_cond_init(&thpool_p->threads_one_idle, NULL);
0188 pthread_cond_init(&thpool_p->proc_one, NULL);
0189
0190
0191 int n;
0192 for (n=0; n<num_threads; n++)
0193 {
0194
0195 thpool_p->thread_status = EXSUCCEED;
0196 MUTEX_LOCK_V(thpool_p->thcount_lock);
0197
0198
0199 if (EXSUCCEED!=poolthread_init(thpool_p, &thpool_p->threads[n], n))
0200 {
0201 if (NULL!=p_ret)
0202 {
0203 *p_ret=EXFAIL;
0204 }
0205
0206
0207 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0208 goto out;
0209 }
0210
0211
0212 pthread_cond_wait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock);
0213
0214 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0215
0216
0217 if (EXFAIL==thpool_p->thread_status)
0218 {
0219
0220 pthread_join(thpool_p->threads[n]->pthread, NULL);
0221
0222 if (NULL!=p_ret)
0223 {
0224 *p_ret=EXFAIL;
0225
0226 break;
0227 }
0228 }
0229
0230 #if THPOOL_DEBUG
0231 printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
0232 #endif
0233 }
0234
0235
0236
0237
0238
0239 out:
0240 return thpool_p;
0241 }
0242
0243 int ndrx_thpool_add_work2(thpool_* thpool_p, void (*function_p)(void*, int *), void* arg_p, long flags, int max_len)
0244 {
0245 int ret = EXSUCCEED;
0246 job* newjob;
0247
0248 newjob=(struct job*)NDRX_FPMALLOC(sizeof(struct job), 0);
0249
0250 if (newjob==NULL)
0251 {
0252 err("thpool_add_work(): Could not allocate memory for new job\n");
0253 return -1;
0254 }
0255
0256
0257 newjob->function=function_p;
0258 newjob->arg=arg_p;
0259
0260
0261 MUTEX_LOCK_V(thpool_p->thcount_lock);
0262
0263 if (flags & NDRX_THPOOL_ONEJOB && thpool_p->jobqueue.len > 0)
0264 {
0265 NDRX_LOG(log_debug, "NDRX_THPOOL_ONEJOB set and queue len is %d - skip this job",
0266 thpool_p->jobqueue.len);
0267
0268
0269 NDRX_FPFREE(newjob);
0270 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0271 return NDRX_THPOOL_ONEJOB;
0272 }
0273
0274
0275
0276
0277 if (max_len > 0)
0278 {
0279 while (thpool_p->jobqueue.len > max_len)
0280 {
0281
0282 struct timespec wait_time;
0283 struct timeval now;
0284
0285 gettimeofday(&now,NULL);
0286
0287 wait_time.tv_sec = now.tv_sec+1;
0288 wait_time.tv_nsec = now.tv_usec*1000;
0289
0290 if (EXSUCCEED!=(ret=pthread_cond_timedwait(&thpool_p->proc_one,
0291 &thpool_p->thcount_lock, &wait_time)))
0292 {
0293 NDRX_LOG(log_error, "Waiting for %d jobs (current: %d) but expired... (err: %s)",
0294 max_len, thpool_p->jobqueue.len, strerror(ret));
0295
0296
0297 break;
0298 }
0299 }
0300 }
0301
0302 jobqueue_push(&thpool_p->jobqueue, newjob);
0303
0304 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0305
0306 return 0;
0307 }
0308
0309
0310
0311
0312 int ndrx_thpool_add_work(thpool_* thpool_p, void (*function_p)(void*, int *), void* arg_p)
0313 {
0314 return ndrx_thpool_add_work2(thpool_p, function_p, arg_p, 0, 0);
0315 }
0316
0317
0318
0319
0320
0321 void ndrx_thpool_wait(thpool_* thpool_p)
0322 {
0323 MUTEX_LOCK_V(thpool_p->thcount_lock);
0324
0325 while (thpool_p->jobqueue.len || thpool_p->num_threads_working)
0326 {
0327 pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
0328 }
0329 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0330 }
0331
0332
0333
0334
0335
0336 void ndrx_thpool_signal_one(thpool_* thpool_p)
0337 {
0338 pthread_cond_signal(&thpool_p->threads_one_idle);
0339 }
0340
0341
0342
0343
0344
0345
0346
0347
0348
0349
0350 int ndrx_thpool_timedwait_less(thpool_* thpool_p, int less_than, long timeout,
0351 int *cond)
0352 {
0353 int ret = EXSUCCEED;
0354 struct timespec wait_time;
0355 struct timeval now;
0356 long delta, sleep_time;
0357
0358 ndrx_stopwatch_t w;
0359
0360 ndrx_stopwatch_reset(&w);
0361 MUTEX_LOCK_V(thpool_p->thcount_lock);
0362
0363 while (thpool_p->jobqueue.len + thpool_p->num_threads_working>=less_than &&
0364 (delta=ndrx_stopwatch_get_delta(&w)) < timeout && !*cond)
0365 {
0366
0367 gettimeofday(&now, NULL);
0368
0369 wait_time.tv_sec = now.tv_sec;
0370
0371 wait_time.tv_nsec = now.tv_usec*1000;
0372
0373 sleep_time = timeout - delta;
0374
0375 if (sleep_time<=0)
0376 {
0377
0378 break;
0379 }
0380
0381 ndrx_timespec_plus(&wait_time, sleep_time);
0382 pthread_cond_timedwait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock, &wait_time);
0383 }
0384
0385
0386 if (thpool_p->jobqueue.len + thpool_p->num_threads_working < less_than)
0387 {
0388 ret = EXTRUE;
0389 }
0390
0391 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0392
0393
0394 return ret;
0395 }
0396
0397
0398
0399
0400
0401
0402 void ndrx_thpool_wait_one(thpool_* thpool_p)
0403 {
0404 MUTEX_LOCK_V(thpool_p->thcount_lock);
0405
0406
0407 while ( (thpool_p->jobqueue.len -
0408 (thpool_p->num_threads-thpool_p->num_threads_working) >= 0 ))
0409 {
0410 pthread_cond_wait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock);
0411 }
0412
0413 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0414 }
0415
0416
0417
0418
0419
0420
0421
0422 int ndrx_thpool_nr_not_working(thpool_* thpool_p)
0423 {
0424 int nr;
0425
0426 MUTEX_LOCK_V(thpool_p->thcount_lock);
0427
0428 nr = thpool_p->num_threads - thpool_p->num_threads_working - thpool_p->jobqueue.len;
0429
0430 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0431
0432 return nr;
0433 }
0434
0435
0436 void ndrx_thpool_destroy(thpool_* thpool_p)
0437 {
0438 int n;
0439 volatile int threads_total = thpool_p->num_threads;
0440
0441 double TIMEOUT = 1.0;
0442 time_t start, end;
0443 double tpassed = 0.0;
0444 time (&start);
0445
0446
0447 if (thpool_p == NULL) return ;
0448
0449 thpool_p->threads_keepalive = 0;
0450
0451
0452 while (tpassed < TIMEOUT && thpool_p->num_threads_alive)
0453 {
0454 bsem_post_all(thpool_p->jobqueue.has_jobs);
0455 time (&end);
0456 tpassed = difftime(end,start);
0457 }
0458
0459
0460 while (thpool_p->num_threads_alive)
0461 {
0462 bsem_post_all(thpool_p->jobqueue.has_jobs);
0463 sleep(1);
0464 }
0465
0466
0467 for (n=0; n<thpool_p->num_threads; n++)
0468 {
0469 pthread_join(thpool_p->threads[n]->pthread, NULL);
0470 }
0471
0472
0473 jobqueue_destroy(&thpool_p->jobqueue);
0474
0475
0476 for (n=0; n < thpool_p->num_threads_allocd; n++)
0477 {
0478 poolthread_destroy(thpool_p->threads[n]);
0479 }
0480
0481 NDRX_FPFREE(thpool_p->threads);
0482 NDRX_FPFREE(thpool_p);
0483 }
0484
0485
0486
0487
0488
0489
0490
0491
0492
0493
0494 static int poolthread_init (thpool_* thpool_p, struct poolthread** thread_p, int id)
0495 {
0496 int ret = EXSUCCEED;
0497 pthread_attr_t pthread_custom_attr;
0498 pthread_attr_init(&pthread_custom_attr);
0499
0500 *thread_p = (struct poolthread*)NDRX_FPMALLOC(sizeof(struct poolthread), 0);
0501
0502 if (*thread_p == NULL)
0503 {
0504 err("poolthread_init(): Could not allocate memory for thread\n");
0505 return -1;
0506 }
0507
0508 (*thread_p)->thpool_p = thpool_p;
0509 (*thread_p)->id = id;
0510 thpool_p->num_threads_allocd++;
0511
0512
0513 ndrx_platf_stack_set(&pthread_custom_attr);
0514
0515 if (EXSUCCEED!=pthread_create(&(*thread_p)->pthread, &pthread_custom_attr,
0516 (void *)poolthread_do, (*thread_p)))
0517 {
0518 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "poolthread_init");
0519 EXFAIL_OUT(ret);
0520 }
0521
0522 out:
0523 return ret;
0524 }
0525
0526
0527
0528
0529
0530
0531
0532
0533
0534 static void* poolthread_do(struct poolthread* thread_p)
0535 {
0536
0537
0538 int finish_off = 0;
0539 int all_idle;
0540 int one_idle;
0541 int ret = EXSUCCEED;
0542 char thread_name[128] = {0};
0543 snprintf(thread_name, sizeof(thread_name), "thread-pool-%d", thread_p->id);
0544
0545 #ifdef EX_OS_LINUX
0546 prctl(PR_SET_NAME, thread_name);
0547 #endif
0548
0549
0550 thpool_* thpool_p = thread_p->thpool_p;
0551
0552
0553 if (NULL!=thread_p->thpool_p->pf_init)
0554 {
0555 NDRX_LOG(log_debug, "About to call tpsvrthrinit()");
0556 ret = thread_p->thpool_p->pf_init(thread_p->thpool_p->argc,
0557 thread_p->thpool_p->argv);
0558
0559 if (EXSUCCEED!=ret)
0560 {
0561 NDRX_LOG(log_error, "tpsvrthrinit() failed %d", ret);
0562 userlog("tpsvrthrinit() failed %d", ret);
0563 }
0564 else
0565 {
0566 NDRX_LOG(log_debug, "tpsvrthrinit() OK");
0567 }
0568 }
0569
0570
0571
0572
0573 MUTEX_LOCK_V(thpool_p->thcount_lock);
0574
0575
0576
0577 if (EXSUCCEED==ret)
0578 {
0579 thpool_p->num_threads_alive += 1;
0580 thpool_p->num_threads+=1;
0581 }
0582 else
0583 {
0584 thpool_p->thread_status = EXFAIL;
0585 pthread_cond_signal(&thpool_p->threads_one_idle);
0586 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0587 return NULL;
0588 }
0589
0590 pthread_cond_signal(&thpool_p->threads_one_idle);
0591 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0592
0593 while(thread_p->thpool_p->threads_keepalive && !finish_off)
0594 {
0595 bsem_wait(thpool_p->jobqueue.has_jobs);
0596
0597 if (thread_p->thpool_p->threads_keepalive)
0598 {
0599 void(*func_buff)(void* arg, int *p_finish_off);
0600 void* arg_buff;
0601 job* job_p;
0602
0603 MUTEX_LOCK_V(thpool_p->thcount_lock);
0604 thpool_p->num_threads_working++;
0605
0606
0607
0608 job_p = jobqueue_pull(&thpool_p->jobqueue);
0609
0610
0611 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0612 if (job_p)
0613 {
0614 func_buff = job_p->function;
0615 arg_buff = job_p->arg;
0616 func_buff(arg_buff, &finish_off);
0617 NDRX_FPFREE(job_p);
0618 }
0619
0620
0621
0622
0623
0624
0625
0626
0627
0628
0629 all_idle=EXFALSE;
0630 one_idle=EXFALSE;
0631 MUTEX_LOCK_V(thpool_p->thcount_lock);
0632
0633
0634 thpool_p->num_threads_working--;
0635
0636 if (!thpool_p->num_threads_working)
0637 {
0638 all_idle=EXTRUE;
0639 }
0640
0641 if ((thpool_p->jobqueue.len -
0642 (thpool_p->num_threads-thpool_p->num_threads_working) < 0 ))
0643 {
0644 one_idle=EXTRUE;
0645 }
0646
0647 if (all_idle)
0648 {
0649 pthread_cond_signal(&thpool_p->threads_all_idle);
0650 }
0651
0652 if (one_idle)
0653 {
0654 pthread_cond_signal(&thpool_p->threads_one_idle);
0655 }
0656
0657
0658 pthread_cond_signal(&thpool_p->proc_one);
0659
0660 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0661
0662 }
0663 }
0664
0665
0666 if (NULL!=thread_p->thpool_p->pf_done)
0667 {
0668 thread_p->thpool_p->pf_done();
0669 }
0670
0671 MUTEX_LOCK_V(thpool_p->thcount_lock);
0672 thpool_p->num_threads_alive --;
0673 MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0674
0675 return NULL;
0676 }
0677
0678
0679
0680 static void poolthread_destroy (poolthread* thread_p)
0681 {
0682 NDRX_FPFREE(thread_p);
0683 }
0684
0685
0686
0687
0688
0689 static int jobqueue_init(jobqueue* jobqueue_p)
0690 {
0691 jobqueue_p->len = 0;
0692 jobqueue_p->front = NULL;
0693 jobqueue_p->rear = NULL;
0694
0695 jobqueue_p->has_jobs = (struct bsem*)NDRX_FPMALLOC(sizeof(struct bsem), 0);
0696
0697 if (jobqueue_p->has_jobs == NULL)
0698 {
0699 return -1;
0700 }
0701
0702 bsem_init(jobqueue_p->has_jobs, 0);
0703
0704 return 0;
0705 }
0706
0707
0708
0709
0710
0711
0712
0713 static void jobqueue_clear(jobqueue* jobqueue_p)
0714 {
0715 while(jobqueue_p->len)
0716 {
0717 NDRX_FPFREE(jobqueue_pull(jobqueue_p));
0718 }
0719
0720 jobqueue_p->front = NULL;
0721 jobqueue_p->rear = NULL;
0722 bsem_reset(jobqueue_p->has_jobs);
0723 jobqueue_p->len = 0;
0724
0725 }
0726
0727
0728
0729
0730
0731 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob)
0732 {
0733 newjob->prev = NULL;
0734
0735 switch(jobqueue_p->len)
0736 {
0737
0738 case 0:
0739 jobqueue_p->front = newjob;
0740 jobqueue_p->rear = newjob;
0741 break;
0742
0743 default:
0744 jobqueue_p->rear->prev = newjob;
0745 jobqueue_p->rear = newjob;
0746
0747 }
0748 jobqueue_p->len++;
0749
0750 bsem_post(jobqueue_p->has_jobs);
0751 }
0752
0753
0754
0755
0756
0757
0758
0759
0760 static struct job* jobqueue_pull(jobqueue* jobqueue_p)
0761 {
0762 job* job_p = jobqueue_p->front;
0763
0764 switch(jobqueue_p->len)
0765 {
0766 case 0:
0767 break;
0768
0769 case 1:
0770 jobqueue_p->front = NULL;
0771 jobqueue_p->rear = NULL;
0772 jobqueue_p->len = 0;
0773 break;
0774
0775 default:
0776 jobqueue_p->front = job_p->prev;
0777 jobqueue_p->len--;
0778
0779 bsem_post(jobqueue_p->has_jobs);
0780
0781 }
0782 return job_p;
0783 }
0784
0785
0786
0787
0788
0789 static void jobqueue_destroy(jobqueue* jobqueue_p)
0790 {
0791 jobqueue_clear(jobqueue_p);
0792 NDRX_FPFREE(jobqueue_p->has_jobs);
0793 }
0794
0795
0796
0797
0798
0799
0800 static void bsem_init(bsem *bsem_p, int value)
0801 {
0802 if (value < 0 || value > 1)
0803 {
0804 err("bsem_init(): Binary semaphore can take only values 1 or 0");
0805 exit(1);
0806 }
0807 pthread_mutex_init(&(bsem_p->mutex), NULL);
0808 pthread_cond_init(&(bsem_p->cond), NULL);
0809 bsem_p->v = value;
0810 }
0811
0812
0813
0814 static void bsem_reset(bsem *bsem_p)
0815 {
0816 bsem_init(bsem_p, 0);
0817 }
0818
0819
0820
0821 static void bsem_post(bsem *bsem_p)
0822 {
0823 MUTEX_LOCK_V(bsem_p->mutex);
0824 bsem_p->v = 1;
0825 pthread_cond_signal(&bsem_p->cond);
0826 MUTEX_UNLOCK_V(bsem_p->mutex);
0827 }
0828
0829
0830
0831 static void bsem_post_all(bsem *bsem_p)
0832 {
0833 MUTEX_LOCK_V(bsem_p->mutex);
0834 bsem_p->v = 1;
0835 pthread_cond_broadcast(&bsem_p->cond);
0836 MUTEX_UNLOCK_V(bsem_p->mutex);
0837 }
0838
0839
0840
0841 static void bsem_wait(bsem* bsem_p)
0842 {
0843 MUTEX_LOCK_V(bsem_p->mutex);
0844 while (bsem_p->v != 1) {
0845 pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
0846 }
0847 bsem_p->v = 0;
0848 MUTEX_UNLOCK_V(bsem_p->mutex);
0849 }