Back to home page

Enduro/X

 
 

    


0001 /* ********************************
0002  * Author:       Johan Hanssen Seferidis
0003  * License:      MIT
0004  * Description:  Library providing a threading pool where you can add
0005  *               work. For usage, check the thpool.h file or README.md
0006  *
0007  *//** @file thpool.h *//*
0008  *
0009  ********************************/
0010 
0011 /* #define _POSIX_C_SOURCE 200809L */
0012 #include <unistd.h>
0013 #include <signal.h>
0014 #include <stdio.h>
0015 #include <stdlib.h>
0016 #include <pthread.h>
0017 #include <errno.h>
0018 #include <sys/time.h> 
0019 #include <ndebug.h>
0020 #include <time.h>
0021 #include <ndrstandard.h>
0022 #ifdef EX_OS_LINUX
0023 #include <sys/prctl.h>
0024 #endif
0025 
0026 #include <exthpool.h>
0027 
0028 #include <nstdutil.h>
0029 #include <ndrxdiag.h>
0030 #include <nstopwatch.h>
0031 
0032 #ifdef THPOOL_DEBUG
0033 #define THPOOL_DEBUG 1
0034 #else
0035 #define THPOOL_DEBUG 0
0036 #endif
0037 
0038 #if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
0039 #define err(str) NDRX_LOG(log_error, str)
0040 #else
0041 #define err(str)
0042 #endif
0043 
0044 /* ========================== STRUCTURES ============================ */
0045 
0046 
0047 /* Binary semaphore */
0048 typedef struct bsem 
0049 {
0050     pthread_mutex_t mutex;
0051     pthread_cond_t   cond;
0052     int v;
0053 } bsem;
0054 
0055 
0056 /* Job */
0057 typedef struct job
0058 {
0059     struct job*  prev;                   /* pointer to previous job   */
0060     void  (*function)(void* arg, int *p_finish_off);/* function pointer*/
0061     void*  arg;                          /* function's argument       */
0062 } job;
0063 
0064 
0065 /* Job queue */
0066 typedef struct jobqueue
0067 {
0068     job  *front;                         /* pointer to front of queue */
0069     job  *rear;                          /* pointer to rear  of queue */
0070     bsem *has_jobs;                      /* flag as binary semaphore  */
0071     int   len;                           /* number of jobs in queue   */
0072 } jobqueue;
0073 
0074 
0075 /* Thread */
0076 typedef struct poolthread
0077 {
0078     int       id;                        /* friendly id               */
0079     pthread_t pthread;                   /* pointer to actual thread  */
0080     struct thpool_* thpool_p;            /* access to thpool          */
0081 } poolthread;
0082 
0083 
0084 /* Threadpool */
0085 typedef struct thpool_
0086 {
0087     poolthread**   threads;                 /**< pointer to threads         */
0088     volatile int num_threads_alive;         /**< threads currently alive    */
0089     volatile int num_threads_working;       /**< threads currently working  */
0090     pthread_mutex_t  thcount_lock;          /**< used for thread count etc  */
0091     pthread_cond_t  threads_all_idle;       /**< signal to thpool_wait      */
0092     pthread_cond_t  threads_one_idle;       /**< signal to thpool_wait_one  */
0093     pthread_cond_t  proc_one;               /**< One job is processed       */
0094     int threads_keepalive;
0095     int num_threads;                        /**< total number of threads    */
0096     int num_threads_allocd;                 /**< total number of threads    */
0097     int thread_status;                      /**< if EXTRUE, init OK         */
0098     jobqueue  jobqueue;                     /**< job queue                  */
0099     ndrx_thpool_tpsvrthrinit_t pf_init;    /**< init function, if any      */
0100     ndrx_thpool_tpsvrthrdone_t pf_done;    /**< done function, if any      */
0101     int argc;                               /**< cli argument count         */
0102     char **argv;                            /**< cli arguments              */
0103 } thpool_;
0104 
0105 
0106 /* ========================== PROTOTYPES ============================ */
0107 
0108 static int  poolthread_init(thpool_* thpool_p, struct poolthread** thread_p, int id);
0109 static void* poolthread_do(struct poolthread* thread_p);
0110 static void  poolthread_hold(int sig_id);
0111 static void  poolthread_destroy(struct poolthread* thread_p);
0112 
0113 static int   jobqueue_init(jobqueue* jobqueue_p);
0114 static void  jobqueue_clear(jobqueue* jobqueue_p);
0115 static void  jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
0116 static struct job* jobqueue_pull(jobqueue* jobqueue_p);
0117 static void  jobqueue_destroy(jobqueue* jobqueue_p);
0118 
0119 static void  bsem_init(struct bsem *bsem_p, int value);
0120 static void  bsem_reset(struct bsem *bsem_p);
0121 static void  bsem_post(struct bsem *bsem_p);
0122 static void  bsem_post_all(struct bsem *bsem_p);
0123 static void  bsem_wait(struct bsem *bsem_p);
0124 
0125 
0126 /* ========================== THREADPOOL ============================ */
0127 
0128 
0129 /* Initialise thread pool 
0130  * @param num_threads number of threads
0131  * @param p_ret return status;
0132  * @param pf_init thread init func
0133  * @param pf_done thread done func
0134  * @param argc command line arguments for thread init func
0135  * @param argv commanc line arguments for thread init func
0136  * @return thread pool (contain 0 or more intitialized threads) or NULL on fail
0137  */
0138 struct thpool_* ndrx_thpool_init(int num_threads, int *p_ret,
0139         ndrx_thpool_tpsvrthrinit_t pf_init, ndrx_thpool_tpsvrthrdone_t pf_done,
0140         int argc, char **argv)
0141 {
0142 
0143     thpool_* thpool_p;
0144     if (num_threads < 0)
0145     {
0146         num_threads = 0;
0147     }
0148 
0149     /* Make new thread pool */
0150     thpool_p = (struct thpool_*)NDRX_FPMALLOC(sizeof(struct thpool_), 0);
0151     if (thpool_p == NULL)
0152     {
0153         err("thpool_init(): Could not allocate memory for thread pool\n");
0154         return NULL;
0155     }
0156     thpool_p->num_threads   = 0;
0157     thpool_p->num_threads_allocd   = 0;
0158     thpool_p->threads_keepalive = 1;
0159     thpool_p->num_threads_alive   = 0;
0160     thpool_p->num_threads_working = 0;
0161     thpool_p->pf_init = pf_init;
0162     thpool_p->pf_done = pf_done;
0163     thpool_p->argc = argc;
0164     thpool_p->argv = argv;
0165 
0166     /* Initialise the job queue */
0167     if (jobqueue_init(&thpool_p->jobqueue) == -1)
0168     {
0169         err("thpool_init(): Could not allocate memory for job queue\n");
0170         NDRX_FPFREE(thpool_p);
0171         return NULL;
0172     }
0173 
0174     /* Make threads in pool */
0175     thpool_p->threads = (struct poolthread**)NDRX_FPMALLOC(num_threads * sizeof(struct poolthread *), 0);
0176     
0177     if (thpool_p->threads == NULL)
0178     {
0179         err("thpool_init(): Could not allocate memory for threads\n");
0180         jobqueue_destroy(&thpool_p->jobqueue);
0181         NDRX_FPFREE(thpool_p);
0182         return NULL;
0183     }
0184 
0185     pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
0186     pthread_cond_init(&thpool_p->threads_all_idle, NULL);
0187     pthread_cond_init(&thpool_p->threads_one_idle, NULL);
0188     pthread_cond_init(&thpool_p->proc_one, NULL);
0189 
0190     /* Thread init, lets do one by one... and check the final counts... */
0191     int n;
0192     for (n=0; n<num_threads; n++)
0193     {
0194         /* lock here */
0195         thpool_p->thread_status = EXSUCCEED;
0196         MUTEX_LOCK_V(thpool_p->thcount_lock);
0197         
0198         /* run off the thread */
0199         if (EXSUCCEED!=poolthread_init(thpool_p, &thpool_p->threads[n], n))
0200         {
0201             if (NULL!=p_ret)
0202             {
0203                 *p_ret=EXFAIL;
0204             }
0205         
0206             /* unlock the mutex, as thread failed to init... */
0207             MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0208             goto out;
0209         }
0210         
0211         /* wait for init complete */
0212         pthread_cond_wait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock);
0213         
0214         MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0215         
0216         /* Check the status... */
0217         if (EXFAIL==thpool_p->thread_status)
0218         {
0219             /* join this one... and terminate all */
0220             pthread_join(thpool_p->threads[n]->pthread, NULL);
0221             /* indicate that we finish off */
0222             if (NULL!=p_ret)
0223             {
0224                 *p_ret=EXFAIL;
0225         /* no need to init those others... */
0226         break;
0227             }
0228         }
0229         
0230 #if THPOOL_DEBUG
0231             printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
0232 #endif
0233     }
0234 
0235     /* TODO: Wait for threads to initialize 
0236     while (thpool_p->num_threads_alive != num_threads) {sched_yield();}
0237      * */
0238     
0239 out:
0240     return thpool_p;
0241 }
0242 
0243 int ndrx_thpool_add_work2(thpool_* thpool_p, void (*function_p)(void*, int *), void* arg_p, long flags, int max_len)
0244 {
0245     int ret = EXSUCCEED;
0246     job* newjob;
0247 
0248     newjob=(struct job*)NDRX_FPMALLOC(sizeof(struct job), 0);
0249     
0250     if (newjob==NULL)
0251     {
0252         err("thpool_add_work(): Could not allocate memory for new job\n");
0253         return -1;
0254     }
0255 
0256     /* add function and argument */
0257     newjob->function=function_p;
0258     newjob->arg=arg_p;
0259     
0260     /* add job to queue */
0261     MUTEX_LOCK_V(thpool_p->thcount_lock);
0262     
0263     if (flags & NDRX_THPOOL_ONEJOB && thpool_p->jobqueue.len > 0)
0264     {
0265         NDRX_LOG(log_debug, "NDRX_THPOOL_ONEJOB set and queue len is %d - skip this job",
0266                 thpool_p->jobqueue.len);
0267         
0268         /* WARNING !!!! Early return! */
0269         NDRX_FPFREE(newjob);
0270         MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0271         return NDRX_THPOOL_ONEJOB;
0272     }
0273 
0274     /* wait for pool to process some amount of jobs, before we continue....
0275      * otherwise this might cause us to buffer lot of outgoing messages
0276      */
0277     if (max_len > 0)
0278     {
0279         while (thpool_p->jobqueue.len > max_len)
0280         {
0281             /* wait for 1 sec... so that we release some control */
0282             struct timespec wait_time;
0283             struct timeval now;
0284 
0285             gettimeofday(&now,NULL);
0286 
0287             wait_time.tv_sec = now.tv_sec+1;
0288             wait_time.tv_nsec = now.tv_usec*1000;
0289 
0290             if (EXSUCCEED!=(ret=pthread_cond_timedwait(&thpool_p->proc_one, 
0291                     &thpool_p->thcount_lock, &wait_time)))
0292             {
0293                 NDRX_LOG(log_error, "Waiting for %d jobs (current: %d) but expired... (err: %s)", 
0294                         max_len, thpool_p->jobqueue.len, strerror(ret));            
0295                 
0296                 /* allow to continue... */
0297                 break;
0298             }
0299         }
0300     }
0301 
0302     jobqueue_push(&thpool_p->jobqueue, newjob);
0303     
0304     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0305 
0306     return 0;
0307 }
0308 
0309 /* Add work to the thread pool 
0310  * default version
0311  */
0312 int ndrx_thpool_add_work(thpool_* thpool_p, void (*function_p)(void*, int *), void* arg_p)
0313 {
0314     return ndrx_thpool_add_work2(thpool_p, function_p, arg_p, 0, 0);
0315 }
0316 
0317 /**
0318  *  Wait until all jobs have finished 
0319  * called by dispatch tread
0320  */
0321 void ndrx_thpool_wait(thpool_* thpool_p)
0322 {
0323     MUTEX_LOCK_V(thpool_p->thcount_lock);
0324     
0325     while (thpool_p->jobqueue.len || thpool_p->num_threads_working)
0326     {
0327         pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
0328     }
0329     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0330 }
0331 
0332 /**
0333  * External signal as free
0334  * @param thpool_p
0335  */
0336 void ndrx_thpool_signal_one(thpool_* thpool_p)
0337 {
0338     pthread_cond_signal(&thpool_p->threads_one_idle);
0339 }
0340 
0341 /**
0342  * Wait for jobs to be less than given number
0343  * @param thpool_p thread pool which to work
0344  * @param less_than jobs/working threads to be less than this number
0345  * @param timeout number of milliseconds to wait
0346  * @param cond pointer to condition when becomes, true terminate the loop
0347  * @return EXFAIL (something bad has happended), EXSUCCEED (timedout), EXTRUE (got
0348  *  condition)
0349  */
0350 int ndrx_thpool_timedwait_less(thpool_* thpool_p, int less_than, long timeout,
0351             int *cond)
0352 {
0353     int ret = EXSUCCEED;
0354     struct timespec wait_time;
0355     struct timeval now;
0356     long delta, sleep_time;
0357     
0358     ndrx_stopwatch_t w;
0359     
0360     ndrx_stopwatch_reset(&w);
0361     MUTEX_LOCK_V(thpool_p->thcount_lock);
0362     
0363     while (thpool_p->jobqueue.len + thpool_p->num_threads_working>=less_than &&
0364             (delta=ndrx_stopwatch_get_delta(&w)) < timeout && !*cond)
0365     {
0366 
0367         gettimeofday(&now, NULL);
0368 
0369         wait_time.tv_sec = now.tv_sec;
0370         /* convert to ms: */
0371         wait_time.tv_nsec = now.tv_usec*1000;
0372 
0373         sleep_time = timeout - delta;
0374         
0375         if (sleep_time<=0)
0376         {
0377             /* timed-out */
0378             break;
0379         }
0380         
0381         ndrx_timespec_plus(&wait_time, sleep_time);
0382         pthread_cond_timedwait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock, &wait_time); 
0383     }
0384     
0385     /* OK, check the condition */
0386     if (thpool_p->jobqueue.len + thpool_p->num_threads_working < less_than)
0387     {
0388         ret = EXTRUE;
0389     }
0390     
0391     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0392     
0393     
0394     return ret;
0395 }
0396 
0397 /**
0398  * Wait until one thread is free.
0399  * Called by dispatch thread.
0400  * @param thpool_p 
0401  */
0402 void ndrx_thpool_wait_one(thpool_* thpool_p)
0403 {
0404     MUTEX_LOCK_V(thpool_p->thcount_lock);
0405 
0406     /* Wait for at-leat one free thread (i.e.) no job found... */
0407     while ( (thpool_p->jobqueue.len - 
0408             (thpool_p->num_threads-thpool_p->num_threads_working) >= 0 ))
0409     {
0410         pthread_cond_wait(&thpool_p->threads_one_idle, &thpool_p->thcount_lock);
0411     }
0412 
0413     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0414 }
0415 
0416 
0417 /**
0418  * Get number of non working and non job scheduled threads
0419  * @param thpool_p thread pool
0420  * @return number of threads fully free (no job scheduled)
0421  */
0422 int ndrx_thpool_nr_not_working(thpool_* thpool_p)
0423 {
0424     int nr;
0425     
0426     MUTEX_LOCK_V(thpool_p->thcount_lock);
0427 
0428     nr = thpool_p->num_threads - thpool_p->num_threads_working - thpool_p->jobqueue.len;
0429     
0430     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0431     
0432     return nr;
0433 }
0434 
0435 /* Destroy the threadpool */
0436 void ndrx_thpool_destroy(thpool_* thpool_p)
0437 {
0438     int n;
0439     volatile int threads_total = thpool_p->num_threads;
0440     /* Give one second to kill idle threads */
0441     double TIMEOUT = 1.0;
0442     time_t start, end;
0443     double tpassed = 0.0;
0444     time (&start);
0445     
0446     /* No need to destory if it's NULL */
0447     if (thpool_p == NULL) return ;
0448     /* End each thread 's infinite loop */
0449     thpool_p->threads_keepalive = 0;
0450     
0451     /* num_threads_alive - reads are atomic... as the same as writes. */
0452     while (tpassed < TIMEOUT && thpool_p->num_threads_alive)
0453     {
0454         bsem_post_all(thpool_p->jobqueue.has_jobs);
0455         time (&end);
0456         tpassed = difftime(end,start);
0457     }
0458     
0459     /* Poll remaining threads */
0460     while (thpool_p->num_threads_alive)
0461     {
0462         bsem_post_all(thpool_p->jobqueue.has_jobs);
0463         sleep(1);
0464     }
0465     
0466     /* join threads... */
0467     for (n=0; n<thpool_p->num_threads; n++)
0468     {
0469         pthread_join(thpool_p->threads[n]->pthread, NULL);
0470     }
0471     
0472     /* Job queue cleanup */
0473     jobqueue_destroy(&thpool_p->jobqueue);
0474     
0475     /* avoid mem leak #250 */
0476     for (n=0; n < thpool_p->num_threads_allocd; n++)
0477     {
0478         poolthread_destroy(thpool_p->threads[n]);
0479     }
0480     
0481     NDRX_FPFREE(thpool_p->threads);
0482     NDRX_FPFREE(thpool_p);
0483 }
0484 
0485 /* ============================ THREAD ============================== */
0486 
0487 
0488 /* Initialize a thread in the thread pool
0489  *
0490  * @param thread        address to the pointer of the thread to be created
0491  * @param id            id to be given to the thread
0492  * @return 0 on success, -1 otherwise.
0493  */
0494 static int poolthread_init (thpool_* thpool_p, struct poolthread** thread_p, int id)
0495 {
0496     int ret = EXSUCCEED;
0497     pthread_attr_t pthread_custom_attr;
0498     pthread_attr_init(&pthread_custom_attr);
0499 
0500     *thread_p = (struct poolthread*)NDRX_FPMALLOC(sizeof(struct poolthread), 0);
0501 
0502     if (*thread_p == NULL)
0503     {
0504         err("poolthread_init(): Could not allocate memory for thread\n");
0505         return -1;
0506     }
0507 
0508     (*thread_p)->thpool_p = thpool_p;
0509     (*thread_p)->id       = id;
0510     thpool_p->num_threads_allocd++;
0511     
0512     /* have some stack space... */
0513     ndrx_platf_stack_set(&pthread_custom_attr);
0514 
0515     if (EXSUCCEED!=pthread_create(&(*thread_p)->pthread, &pthread_custom_attr,
0516                     (void *)poolthread_do, (*thread_p)))
0517     {
0518         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "poolthread_init");
0519         EXFAIL_OUT(ret);
0520     }
0521     /* pthread_detach((*thread_p)->pthread); */
0522 out:
0523     return ret;
0524 }
0525 
0526 /* What each thread is doing
0527 *
0528 * In principle this is an endless loop. The only time this loop gets interuppted is once
0529 * thpool_destroy() is invoked or the program exits.
0530 *
0531 * @param  thread        thread that will run this function
0532 * @return nothing
0533 */
0534 static void* poolthread_do(struct poolthread* thread_p)
0535 {
0536 
0537     /* Set thread name for profiling and debugging */
0538     int finish_off = 0;
0539     int all_idle;
0540     int one_idle;
0541     int ret = EXSUCCEED;
0542     char thread_name[128] = {0};
0543     snprintf(thread_name, sizeof(thread_name), "thread-pool-%d", thread_p->id);
0544 
0545 #ifdef EX_OS_LINUX
0546     prctl(PR_SET_NAME, thread_name);
0547 #endif
0548 
0549     /* Assure all threads have been created before starting serving */
0550     thpool_* thpool_p = thread_p->thpool_p;
0551     
0552     /* run off the init, if any... */
0553     if (NULL!=thread_p->thpool_p->pf_init)
0554     {
0555         NDRX_LOG(log_debug, "About to call tpsvrthrinit()");
0556         ret = thread_p->thpool_p->pf_init(thread_p->thpool_p->argc, 
0557                 thread_p->thpool_p->argv);
0558         
0559         if (EXSUCCEED!=ret)
0560         {
0561             NDRX_LOG(log_error, "tpsvrthrinit() failed %d", ret);
0562             userlog("tpsvrthrinit() failed %d", ret);
0563         }
0564         else
0565         {
0566             NDRX_LOG(log_debug, "tpsvrthrinit() OK");
0567         }
0568     }
0569     
0570     /* 
0571      * Signal that we are ready
0572      */
0573     MUTEX_LOCK_V(thpool_p->thcount_lock);
0574     /* Mark thread as alive (initialized) 
0575      * We could use mutex here, as these are called only at init.
0576      */
0577     if (EXSUCCEED==ret)
0578     {
0579         thpool_p->num_threads_alive += 1;
0580         thpool_p->num_threads+=1;
0581     }
0582     else
0583     {
0584         thpool_p->thread_status = EXFAIL;
0585         pthread_cond_signal(&thpool_p->threads_one_idle);
0586         MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0587         return NULL;
0588     }
0589     
0590     pthread_cond_signal(&thpool_p->threads_one_idle);
0591     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0592     
0593     while(thread_p->thpool_p->threads_keepalive && !finish_off)
0594     {
0595         bsem_wait(thpool_p->jobqueue.has_jobs);
0596 
0597         if (thread_p->thpool_p->threads_keepalive)
0598         {
0599             void(*func_buff)(void* arg, int *p_finish_off);
0600             void*  arg_buff;
0601             job* job_p;
0602             
0603             MUTEX_LOCK_V(thpool_p->thcount_lock);
0604             thpool_p->num_threads_working++;
0605             
0606 
0607             /* Read job from queue and execute it */
0608             job_p = jobqueue_pull(&thpool_p->jobqueue);
0609             
0610             /* FOR WAIT ONE.. HAVE FIXED LEN */
0611             MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0612             if (job_p)
0613             {
0614                 func_buff = job_p->function;
0615                 arg_buff  = job_p->arg;
0616                 func_buff(arg_buff, &finish_off);
0617                 NDRX_FPFREE(job_p);
0618             }
0619 
0620             /* if no threads working, wake up the shutdown
0621              * waiter...
0622              * It will check that there are no any jobs
0623              * To get atomic signaling to waiter, we must lock
0624              * the decision and signal are. So that we catch any
0625              * waiter which might stepping in for signal wait, 
0626              * but we here get faster and trigger the signal.
0627              * and the waiter 
0628              */
0629             all_idle=EXFALSE;
0630             one_idle=EXFALSE;
0631             MUTEX_LOCK_V(thpool_p->thcount_lock);
0632 
0633             /* get the final change before trigger */
0634             thpool_p->num_threads_working--;
0635             
0636             if (!thpool_p->num_threads_working)
0637             {
0638                 all_idle=EXTRUE;
0639             }
0640             
0641             if ((thpool_p->jobqueue.len - 
0642                 (thpool_p->num_threads-thpool_p->num_threads_working) < 0 ))
0643             {
0644                 one_idle=EXTRUE;
0645             }
0646             
0647             if (all_idle)
0648             {  
0649                 pthread_cond_signal(&thpool_p->threads_all_idle);
0650             } 
0651             
0652             if (one_idle)
0653             {  
0654                 pthread_cond_signal(&thpool_p->threads_one_idle);
0655             }
0656             
0657             /* one job is processed... */
0658             pthread_cond_signal(&thpool_p->proc_one);
0659 
0660             MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0661 
0662         }
0663     }
0664     
0665     /* terminate it off... */
0666     if (NULL!=thread_p->thpool_p->pf_done)
0667     {
0668         thread_p->thpool_p->pf_done();
0669     }
0670     
0671     MUTEX_LOCK_V(thpool_p->thcount_lock);
0672     thpool_p->num_threads_alive --;
0673     MUTEX_UNLOCK_V(thpool_p->thcount_lock);
0674 
0675     return NULL;
0676 }
0677 
0678 
0679 /* Frees a thread  */
0680 static void poolthread_destroy (poolthread* thread_p)
0681 {
0682     NDRX_FPFREE(thread_p);
0683 }
0684 
0685 /* ============================ JOB QUEUE =========================== */
0686 
0687 
0688 /* Initialize queue */
0689 static int jobqueue_init(jobqueue* jobqueue_p)
0690 {
0691     jobqueue_p->len = 0;
0692     jobqueue_p->front = NULL;
0693     jobqueue_p->rear  = NULL;
0694 
0695     jobqueue_p->has_jobs = (struct bsem*)NDRX_FPMALLOC(sizeof(struct bsem), 0);
0696     
0697     if (jobqueue_p->has_jobs == NULL)
0698     {
0699         return -1;
0700     }
0701 
0702     bsem_init(jobqueue_p->has_jobs, 0);
0703 
0704     return 0;
0705 }
0706 
0707 
0708 /* 
0709  * Clear the queue
0710  * The caller must ensure that all worker threads have exit.
0711  * Called by dispatch thread
0712  */
0713 static void jobqueue_clear(jobqueue* jobqueue_p)
0714 {
0715     while(jobqueue_p->len)
0716     {
0717         NDRX_FPFREE(jobqueue_pull(jobqueue_p));
0718     }
0719 
0720     jobqueue_p->front = NULL;
0721     jobqueue_p->rear  = NULL;
0722     bsem_reset(jobqueue_p->has_jobs);
0723     jobqueue_p->len = 0;
0724 
0725 }
0726 
0727 /* Add (allocated) job to queue
0728  *! must be protected by outside thcount_lock
0729  * This is called by dispatcher thread
0730  */
0731 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob)
0732 {
0733     newjob->prev = NULL;
0734 
0735     switch(jobqueue_p->len)
0736     {
0737 
0738         case 0:  /* if no jobs in queue */
0739             jobqueue_p->front = newjob;
0740             jobqueue_p->rear  = newjob;
0741             break;
0742 
0743         default: /* if jobs in queue */
0744             jobqueue_p->rear->prev = newjob;
0745             jobqueue_p->rear = newjob;
0746 
0747     }
0748     jobqueue_p->len++;
0749 
0750     bsem_post(jobqueue_p->has_jobs);
0751 }
0752 
0753 
0754 /* Get first job from queue(removes it from queue)
0755  *
0756  * Notice: Caller MUST hold a mutex
0757  * ! must be protected by outside thcount_lock
0758  * This is done by worker
0759  */
0760 static struct job* jobqueue_pull(jobqueue* jobqueue_p)
0761 {
0762     job* job_p = jobqueue_p->front;
0763 
0764     switch(jobqueue_p->len)
0765     {
0766         case 0:  /* if no jobs in queue */
0767             break;
0768 
0769         case 1:  /* if one job in queue */
0770             jobqueue_p->front = NULL;
0771             jobqueue_p->rear  = NULL;
0772             jobqueue_p->len = 0;
0773             break;
0774 
0775         default: /* if >1 jobs in queue */
0776             jobqueue_p->front = job_p->prev;
0777             jobqueue_p->len--;
0778             /* more than one job in queue -> post it */
0779             bsem_post(jobqueue_p->has_jobs);
0780 
0781     }
0782     return job_p;
0783 }
0784 
0785 
0786 /* Free all queue resources back to the system 
0787  * Called by dispatch thread
0788  */
0789 static void jobqueue_destroy(jobqueue* jobqueue_p)
0790 {
0791     jobqueue_clear(jobqueue_p);
0792     NDRX_FPFREE(jobqueue_p->has_jobs);
0793 }
0794 
0795 
0796 /* ======================== SYNCHRONISATION ========================= */
0797 
0798 
0799 /* Init semaphore to 1 or 0 */
0800 static void bsem_init(bsem *bsem_p, int value)
0801 {
0802     if (value < 0 || value > 1)
0803     {
0804         err("bsem_init(): Binary semaphore can take only values 1 or 0");
0805         exit(1);
0806     }
0807     pthread_mutex_init(&(bsem_p->mutex), NULL);
0808     pthread_cond_init(&(bsem_p->cond), NULL);
0809     bsem_p->v = value;
0810 }
0811 
0812 
0813 /* Reset semaphore to 0 */
0814 static void bsem_reset(bsem *bsem_p)
0815 {
0816     bsem_init(bsem_p, 0);
0817 }
0818 
0819 
0820 /* Post to at least one thread */
0821 static void bsem_post(bsem *bsem_p)
0822 {
0823     MUTEX_LOCK_V(bsem_p->mutex);
0824     bsem_p->v = 1;
0825     pthread_cond_signal(&bsem_p->cond);
0826     MUTEX_UNLOCK_V(bsem_p->mutex);
0827 }
0828 
0829 
0830 /* Post to all threads */
0831 static void bsem_post_all(bsem *bsem_p)
0832 {
0833     MUTEX_LOCK_V(bsem_p->mutex);
0834     bsem_p->v = 1;
0835     pthread_cond_broadcast(&bsem_p->cond);
0836     MUTEX_UNLOCK_V(bsem_p->mutex);
0837 }
0838 
0839 
0840 /* Wait on semaphore until semaphore has value 0 */
0841 static void bsem_wait(bsem* bsem_p)
0842 {
0843     MUTEX_LOCK_V(bsem_p->mutex);
0844     while (bsem_p->v != 1) {
0845             pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
0846     }
0847     bsem_p->v = 0;
0848     MUTEX_UNLOCK_V(bsem_p->mutex);
0849 }