Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief System V Queue Time-out and event handling
0003  *
0004  * @file sys_svqevent.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <time.h>
0039 
0040 #include <unistd.h>
0041 #include <stdarg.h>
0042 #include <ctype.h>
0043 #include <memory.h>
0044 #include <errno.h>
0045 #include <signal.h>
0046 #include <limits.h>
0047 #include <pthread.h>
0048 #include <string.h>
0049 #include <fcntl.h>
0050 #include <sys/time.h>
0051 #include <sys/msg.h>
0052 #include <pthread.h>
0053 #include <ndrxdiag.h>
0054 #include <ndrstandard.h>
0055 #include <ndebug.h>
0056 #include <nstdutil.h>
0057 #include <limits.h>
0058 
0059 #include <sys_unix.h>
0060 
0061 #include <utlist.h>
0062 #include <sys_svq.h>
0063 
0064 /*---------------------------Externs------------------------------------*/
0065 /*---------------------------Macros-------------------------------------*/
0066 /** The index of the "read" end of the pipe */
0067 #define READ 0
0068 /** The index of the "write" end of the pipe */
0069 #define WRITE 1
0070 #define PIPE_POLL_IDX           0   /**< where to pipe sits in poll */
0071 /**< default timeout when no thread have been registered for processing  */
0072 #define DFLT_TOUT               60
0073 
0074 /*---------------------------Enums--------------------------------------*/
0075 /*---------------------------Typedefs-----------------------------------*/
0076 
0077 typedef struct ndrx_svq_mqd_hash ndrx_svq_mqd_hash_t;
0078 /**
0079  * Hash of queues and their timeouts assigned and stamps...
0080  */
0081 struct ndrx_svq_mqd_hash
0082 {
0083     void *mqd;                  /**< Queue handler hashed               */
0084     
0085     ndrx_stopwatch_t stamp_time;/**< timestamp for timeout waiting      */
0086     unsigned long stamp_seq;    /**< stamp sequence                     */
0087     
0088     struct timespec abs_timeout;/**< actual future timeout time         */
0089 
0090     EX_hash_handle hh;          /**< make hashable                      */
0091 };
0092 
0093 typedef struct ndrx_svq_fd_hash ndrx_svq_fd_hash_t;
0094 /**
0095  * Hash of queues and their timeouts assigned and stamps...
0096  */
0097 struct ndrx_svq_fd_hash
0098 {
0099     int fd;                     /**< hashed file descriptor             */
0100     mqd_t mqd;                  /**< Queue handler hashed               */
0101 
0102     EX_hash_handle hh;          /**< make hashable                      */
0103 };
0104 
0105 /**
0106  * System V Event monitor data
0107  */
0108 typedef struct
0109 {
0110     int nrfds;              /**< number of FDs in poll                  */
0111     struct pollfd *fdtab;   /**< poll() structure, pre-allocated        */
0112     struct pollfd *fdtabmo; /**< Real monitoirng table                  */
0113     
0114     int evpipe[2];          /**< wakeup pipe from notification thread   */
0115     pthread_t evthread;     /**< Event thread                           */
0116     
0117     /** 
0118      * Message queue descriptor hash list 
0119      * used for timeout processing and queue removal.
0120      */
0121     ndrx_svq_mqd_hash_t *mqdhash; 
0122     
0123     ndrx_svq_fd_hash_t *fdhash;  /**< File descriptor hash list         */
0124     
0125 } ndrx_svq_evmon_t;
0126 
0127 /*---------------------------Globals------------------------------------*/
0128 
0129 /* have a thread handler for tout monitoring thread! */
0130 
0131 exprivate ndrx_svq_evmon_t M_mon = {.evpipe[0]=EXFAIL, 
0132                                     .evpipe[1]=EXFAIL,
0133                                     .fdtab = NULL,
0134                                     .fdtabmo = NULL};
0135 exprivate int M_shutdown = EXFALSE;      /**< is shutdown requested?      */
0136 exprivate int volatile M_alive = EXFALSE;         /**< is monitoring thread alive? */
0137 exprivate int volatile __thread M_signalled = EXFALSE;/**< Did we got a signal?    */
0138 
0139 exprivate MUTEX_LOCKDECL(M_mon_lock_mq); /**< Mutex lock for shared M_mon access, mq  */
0140 exprivate MUTEX_LOCKDECL(M_mon_lock_fd); /**< Mutex lock for shared M_mon access, fd  */
0141 
0142 exprivate int M_scanunit = CONF_NDRX_SCANUNIT_DFLT;  /**< ms for wait on poll q */
0143 exprivate int M_scanunit_was_init = EXFALSE;         /**< i.e used by ndrxd     */
0144 
0145 /* we need two hash lists
0146  * - the one goes by mqd to list/update timeout registrations
0147  *   from this list we calculate the next wakeup too...
0148  * - needs a hash with file descriptors, as in this list there
0149  *   will file descriptors. We need to know to which thread we shall
0150  *   deliver this event for wakeup...
0151  * 
0152  * For both hashes we need following API:
0153  * - add/update
0154  * - delete mqd or fd (loop over...)
0155  */
0156 /*---------------------------Statics------------------------------------*/
0157 /*---------------------------Prototypes---------------------------------*/
0158 
0159 /**
0160  * Add FD to polling structure
0161  * FD Locked by outer caller.
0162  * @param fd file descriptor to add for polling
0163  * @return EXSUCCEED/EXFAIL
0164  */
0165 exprivate int ndrx_svq_fd_hash_addpoll(int fd, uint32_t events)
0166 {
0167     int ret = EXSUCCEED;
0168     
0169     /* resize/realloc events list, add fd */
0170     M_mon.nrfds++;
0171 
0172     NDRX_LOG(log_info, "set nrfds incremented to %d, events: %d", 
0173         M_mon.nrfds, (int)events);
0174 
0175     if (NULL==(M_mon.fdtab=NDRX_REALLOC(M_mon.fdtab, sizeof(struct pollfd)*M_mon.nrfds)))
0176     {
0177         NDRX_LOG(log_error, "Failed to realloc %d/%d", 
0178                 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds);
0179         EXFAIL_OUT(ret);
0180     }
0181     
0182     if (NULL==(M_mon.fdtabmo=NDRX_REALLOC(M_mon.fdtabmo, sizeof(struct pollfd)*M_mon.nrfds)))
0183     {
0184         NDRX_LOG(log_error, "Failed to realloc (mo) %d/%d", 
0185                 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds);
0186         EXFAIL_OUT(ret);
0187     }
0188 
0189     M_mon.fdtab[M_mon.nrfds-1].fd = fd;
0190     M_mon.fdtab[M_mon.nrfds-1].events = events;
0191     M_mon.fdtab[M_mon.nrfds-1].revents = 0;
0192     
0193 out:
0194     return ret;
0195 }
0196 
0197 /**
0198  * Remove FD from polling struct. Must be fd locked by caller
0199  * @param fd file descriptor
0200  * @return EXSUCCEED/EXFAIL
0201  */
0202 exprivate int ndrx_svq_fd_hash_delpoll(int fd)
0203 {
0204     int ret = EXSUCCEED;
0205     int i;
0206    
0207     for (i = 0; i < M_mon.nrfds; i++)
0208     {
0209         if (M_mon.fdtab[i].fd == fd)
0210         {
0211             /* kill the element */
0212             if (i!=M_mon.nrfds-1 && M_mon.nrfds>1)
0213             {
0214                 memmove(&M_mon.fdtab[i], &M_mon.fdtab[i+1], 
0215                         sizeof(struct pollfd)*(M_mon.nrfds-i-1));
0216             }
0217 
0218             M_mon.nrfds--;
0219 
0220             NDRX_LOG(log_info, "set nrfds decremented to %d fdtab=%p", 
0221                      M_mon.nrfds, M_mon.fdtab);
0222 
0223             if (0==M_mon.nrfds)
0224             {
0225                 NDRX_LOG(log_warn, "set->nrfds == 0, => free");
0226                 NDRX_FREE((char *)M_mon.fdtab);
0227                 NDRX_FREE((char *)M_mon.fdtabmo);
0228             }
0229             else if (NULL==(M_mon.fdtab=NDRX_REALLOC(M_mon.fdtab, 
0230                     sizeof(struct pollfd)*M_mon.nrfds)))
0231             {
0232                 int err = errno;
0233                 userlog("Failed to realloc %d/%d: %s", 
0234                         M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds, 
0235                         strerror(err));
0236 
0237                 NDRX_LOG(log_error, "Failed to realloc %d/%d: %s", 
0238                         M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds, 
0239                         strerror(err));
0240 
0241                 EXFAIL_OUT(ret);
0242             }
0243             else if (NULL==(M_mon.fdtabmo=NDRX_REALLOC(M_mon.fdtabmo, 
0244                     sizeof(struct pollfd)*M_mon.nrfds)))
0245             {
0246                 int err = errno;
0247                 userlog("Failed to realloc mo %d/%d: %s", 
0248                         M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds, 
0249                         strerror(err));
0250 
0251                 NDRX_LOG(log_error, "Failed to realloc mo %d/%d: %s", 
0252                         M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds, 
0253                         strerror(err));
0254 
0255                 EXFAIL_OUT(ret);
0256             }
0257             break;
0258         }
0259     }
0260     
0261 out:
0262    
0263     return ret;
0264 }
0265 
0266 /**
0267  * Check queue for FD presence in hash
0268  * @param fd queue descriptor ptr
0269  * @return q descriptor or NULL
0270  */
0271 exprivate ndrx_svq_fd_hash_t * ndrx_svq_fd_hash_find(int fd)
0272 {
0273     ndrx_svq_fd_hash_t *ret = NULL;
0274     
0275     EXHASH_FIND_INT( (M_mon.fdhash), &fd, ret);
0276     
0277     NDRX_LOG(log_dump, "checking fd %d -> %p", fd, ret);
0278     
0279     return ret;
0280 }
0281 
0282 /**
0283  * Add queue to timeout monitor
0284  * @param fd message queue descriptor
0285  * @param stamp_time timestamp when we are going to expire
0286  * @param stamp_seq sequence number for expiry
0287  * @return EXSUCCEED/EXFAIL
0288  */
0289 exprivate int ndrx_svq_fd_hash_add(int fd, mqd_t mqd, uint32_t events)
0290 {
0291     int ret = EXSUCCEED;
0292     ndrx_svq_fd_hash_t * el;
0293     
0294     MUTEX_LOCK_V(M_mon_lock_fd);
0295     
0296     el = ndrx_svq_fd_hash_find(fd);
0297     
0298     if (NULL==el)
0299     {
0300         el = NDRX_FPMALLOC(sizeof(ndrx_svq_fd_hash_t), 0);
0301 
0302         NDRX_LOG(log_dump, "Registering %d as int", fd);
0303 
0304         if (NULL==el)
0305         {
0306             int err = errno;
0307 
0308             NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0309             userlog("Failed to alloc: %s", strerror(err));
0310 
0311             EXFAIL_OUT(ret);
0312         }
0313 
0314         el->fd  = fd;
0315         el->mqd = mqd;
0316         
0317         ndrx_svq_fd_hash_addpoll(fd, events);
0318         EXHASH_ADD_INT((M_mon.fdhash), fd, el);
0319     }
0320     else
0321     {
0322         /* just update entry...  */
0323         el->mqd = mqd;
0324     }
0325     
0326 out:
0327     MUTEX_UNLOCK_V(M_mon_lock_fd);
0328     return ret;
0329 }
0330 
0331 /**
0332  * Number of file descriptors currently monitored
0333  * @return Number of FDs
0334  */
0335 expublic int ndrx_svq_fd_nrof(void)
0336 {
0337     return M_mon.nrfds;
0338 }
0339 /**
0340  * Delete single entry from queue hash
0341  * @param fd queue ptr
0342  * @return EXSUCCEED/EXFAIL
0343  */
0344 exprivate int ndrx_svq_fd_hash_del(int fd)
0345 {
0346     int ret = EXSUCCEED;
0347     ndrx_svq_fd_hash_t *el;
0348 
0349     MUTEX_LOCK_V(M_mon_lock_fd);
0350     
0351     /* remove from polling array!  */
0352     if (EXSUCCEED!=ndrx_svq_fd_hash_delpoll(fd))
0353     {
0354         EXFAIL_OUT(ret);
0355     }
0356     
0357     /* remove form FD registry */
0358     EXHASH_FIND_INT( (M_mon.fdhash), &fd, el);
0359     
0360     if (NULL!=el)
0361     {
0362         EXHASH_DEL((M_mon.fdhash), el);
0363         NDRX_FPFREE(el);
0364     }
0365     
0366 out:
0367     MUTEX_UNLOCK_V(M_mon_lock_fd);
0368     return ret;
0369 }
0370 
0371 /**
0372  * Delete record from fdhash by matching the queue descriptor
0373  * @param mqd queue descriptor ptr
0374  * @return EXSUCCEED/EXFAIL
0375  */
0376 exprivate int ndrx_svq_fd_hash_delbymqd(mqd_t mqd)
0377 {
0378     int ret = EXSUCCEED;
0379     ndrx_svq_fd_hash_t *e=NULL, *et=NULL;
0380     
0381     MUTEX_LOCK_V(M_mon_lock_fd);
0382     /* remove from polling array */
0383     EXHASH_ITER(hh, M_mon.fdhash, e, et)
0384     {
0385         if (e->mqd == mqd)
0386         {
0387             /* delete from polling struct */
0388             if (EXSUCCEED!=ndrx_svq_fd_hash_delpoll(e->fd))
0389             {
0390                 EXFAIL_OUT(ret);
0391             }
0392             
0393             /* delete the entry by it self */
0394             EXHASH_DEL(M_mon.fdhash, e);
0395             NDRX_FPFREE(e);
0396             break;
0397         }
0398     }
0399     
0400 out:
0401     MUTEX_UNLOCK_V(M_mon_lock_fd);
0402     return ret;
0403 }
0404 
0405 /**
0406  * Check queue descriptor presence in hash
0407  * @param mqd queue descriptor ptr
0408  * @return q descriptor or NULL
0409  */
0410 exprivate ndrx_svq_mqd_hash_t * ndrx_svq_mqd_hash_find(mqd_t mqd)
0411 {
0412     ndrx_svq_mqd_hash_t *ret = NULL;
0413     
0414     EXHASH_FIND_PTR( (M_mon.mqdhash), ((void **)&mqd), ret);
0415     
0416     NDRX_LOG(log_dump, "checking mqd %p -> %p", mqd, ret);
0417     
0418     return ret;
0419 }
0420 
0421 /**
0422  * Add queue to timeout monitor
0423  * @param mqd message queue descriptor
0424  * @param stamp_time timestamp when we are going to expire
0425  * @param stamp_seq sequence number for expiry
0426  * @return EXSUCCEED/EXFAIL
0427  */
0428 expublic int ndrx_svq_mqd_hash_add(mqd_t mqd, ndrx_stopwatch_t *stamp_time, 
0429         unsigned long stamp_seq, struct timespec *abs_timeout)
0430 {
0431     int ret = EXSUCCEED;
0432     ndrx_svq_mqd_hash_t * el;
0433     
0434     MUTEX_LOCK_V(M_mon_lock_mq);
0435     el = ndrx_svq_mqd_hash_find(mqd);
0436     
0437     if (NULL==el)
0438     {
0439         el = NDRX_FPMALLOC(sizeof(ndrx_svq_mqd_hash_t), 0);
0440 
0441         NDRX_LOG(log_dump, "Registering %p as mqd_t", mqd);
0442 
0443         if (NULL==el)
0444         {
0445             int err = errno;
0446 
0447             NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0448             userlog("Failed to alloc: %s", strerror(err));
0449 
0450             EXFAIL_OUT(ret);
0451         }
0452 
0453         el->mqd  = (void *)mqd;
0454         el->stamp_seq = stamp_seq;
0455         el->stamp_time = *stamp_time;
0456         el->abs_timeout = *abs_timeout;
0457         EXHASH_ADD_PTR((M_mon.mqdhash), mqd, el);
0458     }
0459     else
0460     {
0461         /* just update entry...  */
0462         el->stamp_seq = stamp_seq;
0463         el->stamp_time = *stamp_time;
0464         el->abs_timeout = *abs_timeout;
0465     }
0466     
0467 out:
0468     
0469     MUTEX_UNLOCK_V(M_mon_lock_mq);
0470 
0471     return ret;
0472 }
0473 
0474 /**
0475  * Delete single entry from queue hash
0476  * @param mqd queue ptr
0477  */
0478 expublic void ndrx_svq_mqd_hash_del(mqd_t mqd)
0479 {
0480     ndrx_svq_mqd_hash_t *ret = NULL;
0481     
0482     /* Remove queue completely */
0483     
0484     NDRX_LOG(log_debug, "Closing SV descr queue %p qstr:[%s] qid:%d", 
0485             mqd, mqd->qstr, mqd->qid);
0486     
0487     /* for service queues, the ndrxd will which are subject for removal
0488      * and for which timestamp is older than configure time frame
0489      * this is due to fact, that queue might be open, but not yet reported
0490      * to ndrxd or shm.
0491      * thus here we just remove directly.
0492      */
0493             
0494     /*
0495     NDRX_LOG(log_dump, "Unregistering %p as mqd_t from timeout mon", mqd);
0496     */
0497     
0498     /* do this first, so that afterwards we clean up all events possible... */
0499     MUTEX_LOCK_V(M_mon_lock_mq);
0500     /* remove from timeout hash */
0501     EXHASH_FIND_PTR( (M_mon.mqdhash), ((void **)&mqd), ret);
0502     
0503     if (NULL!=ret)
0504     {
0505         EXHASH_DEL((M_mon.mqdhash), ret);
0506         NDRX_FPFREE(ret);
0507     }
0508     MUTEX_UNLOCK_V(M_mon_lock_mq);
0509     
0510 }
0511 
0512 /**
0513  * Remove from MQ hash and remove linked file descriptors 
0514  * @param mqd queue descriptor to remove
0515  */
0516 expublic int ndrx_svq_mqd_close(mqd_t mqd)
0517 {
0518     int ret = EXSUCCEED;
0519     ndrx_svq_ev_t *elt, *tmp;
0520     
0521     /* remove FD firstly, if any */
0522     if (EXSUCCEED!=ndrx_svq_fd_hash_delbymqd(mqd))
0523     {
0524         EXFAIL_OUT(ret);
0525     }
0526     
0527     ndrx_svq_mqd_hash_del(mqd);
0528     
0529     /* zap any events.. */
0530     MUTEX_LOCK_V(mqd->qlock);
0531     /* Remove any un-processed queued events... */
0532     DL_FOREACH_SAFE(mqd->eventq,elt,tmp)
0533     {
0534         DL_DELETE(mqd->eventq, elt);
0535         NDRX_FPFREE(elt);
0536     }
0537     MUTEX_UNLOCK_V(mqd->qlock);
0538     
0539     NDRX_SPIN_DESTROY_V(mqd->rcvlock);
0540     NDRX_SPIN_DESTROY_V(mqd->rcvlockb4);
0541 /*
0542     why?
0543     MUTEX_TRYLOCK_V(mqd->barrier);
0544     MUTEX_TRYLOCK_V(mqd->qlock);
0545 */
0546     
0547 out:
0548     return ret;
0549 }
0550 
0551 /**
0552  * Dispatch any pending timeouts
0553  * @return EXSUCCEED/EXFAIL
0554  */
0555 exprivate int ndrx_svq_mqd_hash_dispatch(void)
0556 {
0557     int ret = EXSUCCEED;
0558     ndrx_svq_mqd_hash_t * r, *rt;
0559     long delta;
0560     struct timespec abs_timeout;
0561     struct timeval  timeval;
0562     ndrx_svq_ev_t * ev = NULL;
0563     int err;
0564     
0565     /* TODO: might want to reduce this region of locking... */
0566     MUTEX_LOCK_V(M_mon_lock_mq);
0567     
0568     EXHASH_ITER(hh, (M_mon.mqdhash), r, rt)
0569     {
0570         
0571         gettimeofday (&timeval, NULL);
0572         abs_timeout.tv_sec = timeval.tv_sec;
0573         abs_timeout.tv_nsec = timeval.tv_usec*1000;
0574 
0575         delta = ndrx_timespec_get_delta(&(r->abs_timeout), &abs_timeout);
0576         
0577         if (delta <= 0)
0578         {
0579             int wait_matched;
0580             NDRX_SPIN_LOCK_V(( ((mqd_t)r->mqd)->stamplock));
0581             
0582             if (NDRX_SVQ_TOUT_MATCH((r), ((mqd_t)r->mqd)))
0583             {
0584                 wait_matched = EXTRUE;
0585             }
0586             else
0587             {
0588                 wait_matched = EXFALSE;
0589             }
0590             NDRX_SPIN_UNLOCK_V((((mqd_t)r->mqd)->stamplock));
0591             
0592             NDRX_LOG(log_debug, "Timeout condition: mqd %p time spent: %ld "
0593                     "matched: %d seq: %lu", 
0594                         r->mqd, delta, wait_matched, r->stamp_seq);
0595             
0596             /* lets put the event to the message queue... 
0597              * firstly we need to allocate the event.
0598              */            
0599             
0600             /* In case if not matched, why send event? Bug #537 */
0601            
0602             if (wait_matched)
0603             {
0604                 if (NULL==(ev = NDRX_FPMALLOC(sizeof(ndrx_svq_ev_t), 0)))
0605                 {
0606                     err = errno;
0607                     NDRX_LOG(log_error, "Failed to allocate ndrx_svq_ev_t: %s", 
0608                                 strerror(err));
0609                     userlog("Failed to allocate ndrx_svq_ev_t: %s", 
0610                                 strerror(err));
0611                     EXFAIL_OUT(ret);
0612                 }
0613 
0614                 ev->ev = NDRX_SVQ_EV_TOUT;
0615                 ev->data = NULL;
0616                 ev->stamp_seq = r->stamp_seq;
0617                 ev->stamp_time = r->stamp_time;
0618                 ev->prev = NULL;
0619                 ev->next = NULL;
0620                 ret=ndrx_svq_mqd_put_event(r->mqd, &ev);
0621                 
0622                 if (NULL!=ev)
0623                 {
0624                     NDRX_FPFREE(ev);
0625                 }
0626                 
0627                 if (EXSUCCEED!=ret)
0628                 {
0629                     NDRX_LOG(log_error, "Failed to put event for %p typ %d",
0630                             r->mqd, NDRX_SVQ_EV_TOUT);
0631                     EXFAIL_OUT(ret);
0632                 }
0633             }
0634             else
0635             {
0636                 NDRX_LOG(log_debug, "Wait not matched -> do not put event...");
0637             }
0638             
0639             /* delete timeout object from hash as no more relevant... */
0640             EXHASH_DEL((M_mon.mqdhash), r);
0641             NDRX_FPFREE(r);
0642             
0643         }
0644     }
0645 out:
0646     
0647     MUTEX_UNLOCK_V(M_mon_lock_mq);
0648 
0649     return ret;
0650 }
0651 
0652 /**
0653  * What about admin msg thread? If we get admin message, then
0654  * we need to forward the message to main thread, not?
0655  * this down mixing we will do at poller level..
0656  * Also... we might want to allow only single access to this function.
0657  * as it might cause some unpredictable deadlocks...
0658  * @param mqd message queue descriptor
0659  * @param ev allocate event structure, cleared if used.
0660  * @return EXSUCCEED/EXFAIL
0661  */
0662 expublic int ndrx_svq_mqd_put_event(mqd_t mqd, ndrx_svq_ev_t **ev)
0663 {
0664     int ret = EXSUCCEED;
0665     int l2, l1;
0666     
0667     /* now emit the wakeup call */
0668     /* put barrier, this will also prevent other threads for sending
0669      * the event to main thread during operations of this thread
0670      */
0671     MUTEX_LOCK_V(mqd->barrier);
0672     
0673     /* Append messages to Q: */
0674     MUTEX_LOCK_V(mqd->qlock);
0675     DL_APPEND(mqd->eventq, (*ev));
0676     MUTEX_UNLOCK_V(mqd->qlock);
0677     *ev=NULL;
0678     
0679     l1=NDRX_SPIN_TRYLOCK_V(mqd->rcvlockb4);
0680     l2=NDRX_SPIN_TRYLOCK_V(mqd->rcvlock);
0681 
0682     if (0==l1)
0683     {
0684         /* nothing todo, not locked by main thread
0685          * thus......
0686          * assume that it will pick the msg up in next loop
0687          */
0688         /* fprintf(stderr, "we locked b4 area, thus process is in workload\n"); */
0689         NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0690 
0691         if (0==l2)
0692         {
0693             NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0694         }
0695     }
0696     else
0697     {
0698         
0699         /* Seems that main thread is doing something within send/receive block
0700          * unlock our friend, so that it can step forward
0701          */
0702         /*
0703          * from above if (0==l1) we know that this is else it is not 0.
0704         if (0==l1)
0705         {
0706             pthread_spin_unlock(&(mqd->rcvlockb4));
0707         }
0708         */
0709 
0710         if (0==l2)
0711         {
0712             NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0713         }
0714 
0715         /* reseync on Q lock so that we know that main thread is close
0716          * to send/receive blocked state
0717          */
0718         /* rcvlockb4 is locked here! and next step for the main thread is lock qlock! */
0719         MUTEX_LOCK_V(mqd->qlock);
0720         MUTEX_UNLOCK_V(mqd->qlock);
0721 
0722         /* now loop the locking until we get both locked 
0723          * or both non locked
0724          */
0725         while (1)
0726         {
0727             /* try lock again... */
0728             l1=NDRX_SPIN_TRYLOCK_V(mqd->rcvlockb4);
0729             l2=NDRX_SPIN_TRYLOCK_V(mqd->rcvlock);
0730 
0731             /* both not locked, then we need to interrupt the thread */
0732 
0733             if (0==l1 && 0==l2)
0734             {
0735                 /* then it will process or queued event anyway... */
0736 
0737                 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0738                 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0739 
0740                 break;
0741             }
0742             else if (0!=l1 && 0!=l2)
0743             {
0744                 /*Maybe try to send only 1 signal? 
0745 
0746                 if (0==sigs)
0747                 {*/
0748                     if (0!=pthread_kill(mqd->thread, NDRX_SVQ_SIG))
0749                     {
0750                         int err = errno;
0751                         NDRX_LOG(log_error, "pthread_kill(%d) failed: %s", 
0752                                 NDRX_SVQ_SIG, strerror(err));
0753                         userlog("pthread_kill(%d) failed: %s", 
0754                                 NDRX_SVQ_SIG, strerror(err));
0755                         EXFAIL_OUT(ret);
0756                     }
0757                     break;
0758                     /*
0759                     sigs++;
0760                      * 
0761                 }*/
0762             }
0763 
0764             if (0==l1)
0765             {
0766                 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0767             }
0768 
0769             if (0==l2)
0770             {
0771                 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0772             }
0773 
0774             /* Maybe do yeald... */
0775             sched_yield();
0776             /* usleep(1); */
0777         }
0778     }
0779 
0780 out:
0781     MUTEX_UNLOCK_V(mqd->barrier);
0782 
0783     return ret;        
0784 }
0785 
0786 /**
0787  * Wakup signal handling
0788  * in worst case if spin locks will consume the wakup signals
0789  * we could put in some queue the mqds waiting for signals,
0790  * and then update the mqds in queue that we got a signal
0791  * thus we shall not enter into msgrcv..
0792  * @param sig
0793  */
0794 exprivate void ndrx_svq_signal_action(int sig)
0795 {
0796     /* nothing todo, just ignore  
0797     NDRX_LOG(log_debug, "Signal action");
0798      * !!!! Bug #530 Signal handler - not safe functions used.
0799      * */
0800     M_signalled = sig;
0801     return;
0802 }
0803 
0804 /**
0805  * Thread used for monitoring the pipe of incoming timeout requests
0806  * doing polling for the given time period and sending the events
0807  * to queue threads.
0808  * @param arg
0809  * @return 
0810  */
0811 exprivate void * ndrx_svq_timeout_thread(void* arg)
0812 {
0813     int retpoll;
0814     int ret = EXSUCCEED;
0815     int i, moc, donext;
0816     int err;
0817     ndrx_svq_mon_cmd_t cmd;
0818     ndrx_svq_ev_t *ev;
0819     sigset_t set;
0820     struct pollfd *fdtabmo_tmp = NULL; /**< Real monitoirng table       */
0821     
0822     /* init the TLS thread */
0823     _Nunset_error();
0824     
0825     /* mask all signals except user signal */
0826     
0827     if (EXSUCCEED!=sigfillset(&set))
0828     {
0829         err = errno;
0830         NDRX_LOG(log_error, "Failed to fill signal array: %s", strerror(err));
0831         userlog("Failed to fill signal array: %s", strerror(err));
0832         EXFAIL_OUT(ret);
0833     }
0834     
0835     if (EXSUCCEED!=sigdelset(&set, NDRX_SVQ_SIG))
0836     {
0837         err = errno;
0838         NDRX_LOG(log_error, "Failed to delete signal %d: %s", 
0839                 NDRX_SVQ_SIG, strerror(err));
0840         userlog("Failed to delete signal %d: %s", 
0841                 NDRX_SVQ_SIG, strerror(err));
0842         EXFAIL_OUT(ret);
0843     }
0844     
0845     if (EXSUCCEED!=pthread_sigmask(SIG_BLOCK, &set, NULL))
0846     {
0847         err = errno;
0848         NDRX_LOG(log_error, "Failed to block all signals but %d for even thread: %s", 
0849                 NDRX_SVQ_SIG, strerror(err));
0850         userlog("Failed to block all signals but %d for even thread: %s", 
0851                 NDRX_SVQ_SIG, strerror(err));
0852         EXFAIL_OUT(ret);
0853     }
0854     
0855     /**
0856      * Perform waiting for file descriptor events.
0857      * while the main thread is busy, we do not expect any FD monitoring
0858      * because then it will generate lots of async events as FDs will not be
0859      * flushed and events will be triggered again and again. Thus
0860      * when the XATMI server's main thread goes into wait for message mode, then
0861      * we mark the syncfd as TRUE. Only at that particular time other XATMI
0862      * client threads might doe some message sending too (set timeout for
0863      * their operations), thus that might reloop and will make lost of syncfd.
0864      * Thus we reset the syncfd flag only when we have sent event to main
0865      * thread.
0866      */
0867     int syncfd = EXFALSE;
0868     /* we shall receive unnamed pipe
0869      * in thread.
0870      * 
0871      * During the operations, via pipe we might receive following things:
0872      * 1) request for timeout monitoring.
0873      * 2) request for adding particular FD in monitoring sub-set
0874      * 3) request for deleting particular FD from monitoring sub-set
0875      */
0876     
0877     /* wait for event... */
0878     while (!M_shutdown)
0879     {
0880 
0881         NDRX_LOG(6, "About to poll for: %d ms nrfds=%d",
0882                 M_scanunit, M_mon.nrfds);
0883         
0884         moc=0;
0885         donext=EXTRUE;
0886         /* this needs to be locked... */
0887         
0888         MUTEX_LOCK_V(M_mon_lock_fd);
0889         
0890         for (i=0; i<M_mon.nrfds; i++)
0891         {
0892             /* M_mon.fdtab[i].revents = 0; 
0893              
0894              we know that first is command pipe the others are FDs
0895              */
0896             if (syncfd || i<1)
0897             {
0898                 M_mon.fdtabmo[moc].revents = 0;
0899                 M_mon.fdtabmo[moc].fd = M_mon.fdtab[i].fd;
0900                 M_mon.fdtabmo[moc].events = M_mon.fdtab[i].events;
0901                 
0902                 NDRX_LOG(6, "moc=%d %p %p fd=%d", moc, 
0903                         M_mon.fdtabmo, M_mon.fdtab, M_mon.fdtabmo[moc].fd);
0904                 moc++;
0905             }
0906         }
0907         
0908         /* copy the FDs to temporary buffer..., realloc */
0909         if (NULL!=fdtabmo_tmp)
0910         {
0911             NDRX_FPFREE(fdtabmo_tmp);
0912         }
0913         
0914         fdtabmo_tmp = NDRX_FPMALLOC(sizeof(struct pollfd)*moc, 0);
0915         
0916         if (NULL==fdtabmo_tmp)
0917         {
0918             NDRX_LOG(log_error, "Failed to malloc %d: %s", 
0919                     sizeof(struct pollfd)*moc, strerror(errno));
0920             userlog("Failed to malloc %d: %s", 
0921                     sizeof(struct pollfd)*moc, strerror(errno));
0922             
0923             MUTEX_UNLOCK_V(M_mon_lock_fd);
0924             EXFAIL_OUT(ret);
0925         }
0926         
0927         /* set off the data */
0928         memcpy(fdtabmo_tmp, M_mon.fdtabmo, sizeof(struct pollfd)*moc);
0929         
0930         MUTEX_UNLOCK_V(M_mon_lock_fd);
0931         
0932         retpoll = poll( fdtabmo_tmp, moc, M_scanunit);
0933         
0934         NDRX_LOG(6, "poll() ret = %d", retpoll);
0935         if (EXFAIL==retpoll)
0936         {
0937             err = errno;
0938             
0939             if (err==EINTR)
0940             {
0941                 NDRX_LOG(log_warn, "poll() interrupted... ignore...: %s", 
0942                         strerror(err));
0943             }
0944             else
0945             {
0946                 NDRX_LOG(log_error, "System V auxiliary event poll() got error: %s", 
0947                         strerror(err));
0948                 userlog("System V auxiliary event poll() got error: %s", 
0949                         strerror(err));
0950                 EXFAIL_OUT(ret);
0951             }
0952         }
0953         else if (0==retpoll)
0954         {
0955             /* 
0956              * we got timeout, so lets scan the hash lists to detect
0957              */
0958             NDRX_LOG(6, "Got timeout...");
0959             
0960             /* 
0961              * now detect queues/threads to which this event shall be submitted
0962              */
0963             if (EXSUCCEED!=ndrx_svq_mqd_hash_dispatch())
0964             {
0965                 NDRX_LOG(log_error, "Failed to dispatch events - general failure "
0966                         "- reboot to avoid lockups!");
0967                 userlog("Failed to dispatch events - general failure "
0968                         "- reboot to avoid lockups!");
0969                 
0970                 /* better reboot process, so that we avoid any lockups! */
0971                 exit(EXFAIL);
0972             }
0973             
0974         }
0975         else for (i=0; i<moc && donext; i++)
0976         {
0977             if (fdtabmo_tmp[i].revents)
0978             {
0979                 NDRX_LOG(log_debug, "%d fd=%d revents=%d", i, 
0980                         fdtabmo_tmp[i].fd, (int)fdtabmo_tmp[i].revents);
0981                 if (PIPE_POLL_IDX==i)
0982                 {
0983                     /* We got something from command pipe, thus 
0984                      * we alter our selves
0985                      */
0986                     
0987                     /* receive the command first... */
0988                     if (EXFAIL==read(M_mon.evpipe[READ], &cmd, sizeof(cmd)))
0989                     {
0990                         err = errno;
0991                         
0992                         NDRX_LOG(log_error, "Failed to receive command block by "
0993                                 "System V monitoring thread: %s", strerror(err));
0994                         userlog("Failed to receive command block by "
0995                                 "System V monitoring thread: %s", strerror(err));
0996                         EXFAIL_OUT(ret);
0997                     }
0998                     
0999                     NDRX_LOG(log_debug, "Got command: %d %p flags=%d", 
1000                             cmd.cmd, cmd.mqd, cmd.flags);
1001                     
1002                     /* next time monitor file descriptors too */
1003                     if (NDRX_SVQ_MONF_SYNCFD & cmd.flags)
1004                     {
1005                         syncfd = EXTRUE;
1006                     }
1007                     
1008                     switch (cmd.cmd)
1009                     {
1010                         case NDRX_SVQ_MON_TOUT:
1011                             
1012                             if (cmd.abs_timeout.tv_sec > 0 || 
1013                                     cmd.abs_timeout.tv_nsec > 0)
1014                             {
1015                                 NDRX_LOG(log_debug, "register timeout %p "
1016                                         "stamp_seq=%ld sec %ld nsec %ld",
1017                                         cmd.mqd, cmd.stamp_seq, 
1018                                         (long)cmd.abs_timeout.tv_sec,
1019                                         (long)cmd.abs_timeout.tv_nsec);
1020 
1021                                 if (EXSUCCEED!=ndrx_svq_mqd_hash_add(cmd.mqd, 
1022                                         &(cmd.stamp_time), cmd.stamp_seq, &(cmd.abs_timeout)))
1023                                 {
1024                                     NDRX_LOG(log_error, "Failed to register timeout for %p!",
1025                                             cmd.mqd);
1026                                     userlog("Failed to register timeout for %p!",
1027                                             cmd.mqd);
1028                                     EXFAIL_OUT(ret);
1029                                 }
1030                             }
1031                             
1032                             break;
1033                         case NDRX_SVQ_MON_ADDFD:
1034                             
1035                             NDRX_LOG(log_info, "register fd %d for %p events %ld", 
1036                                     cmd.fd, cmd.mqd, (long)cmd.events);
1037                             if (EXSUCCEED!=ndrx_svq_fd_hash_add(cmd.fd, cmd.mqd, 
1038                                     cmd.events))
1039                             {
1040                                 NDRX_LOG(log_error, "Failed to register timeout for %p!",
1041                                         cmd.mqd);
1042                                 userlog("Failed to register timeout for %p!",
1043                                         cmd.mqd);
1044                                 EXFAIL_OUT(ret);
1045                             }
1046                             
1047                             break;
1048                             
1049                         case NDRX_SVQ_MON_RMFD:
1050                             NDRX_LOG(log_info, "Deregister fd %d from polling", 
1051                                     cmd.fd);
1052                             ndrx_svq_fd_hash_del(cmd.fd);
1053                             /* if we get some more events, we shall break this loop and try next poll... */
1054                             donext=EXFALSE;
1055                             break;
1056                         case NDRX_SVQ_MON_TERM:
1057                             NDRX_LOG(log_info, "Terminate request...");
1058                             goto out;
1059                             break;
1060                     }
1061                     
1062                 }
1063                 else
1064                 {
1065                     ndrx_svq_fd_hash_t *fdh =  ndrx_svq_fd_hash_find(fdtabmo_tmp[i].fd);
1066                     
1067                     if (NULL==fdh)
1068                     {
1069                         NDRX_LOG(log_error, "File descriptor %d not registered"
1070                                 " int System V poller - FAIL", fdtabmo_tmp[i].fd);
1071                         userlog("File descriptor %d not registered"
1072                                 " int System V poller - FAIL", fdtabmo_tmp[i].fd);
1073                         EXFAIL_OUT(ret);
1074                     }
1075                             
1076                     /* this one is from related file descriptor... 
1077                      * thus needs to put an event.
1078                      */
1079                    if (NULL==(ev = NDRX_FPMALLOC(sizeof(*ev), 0)))
1080                    {
1081                        err = errno;
1082                        NDRX_LOG(log_error, "Failed to malloc ndrx_svq_ev_t: %s", 
1083                                strerror(err));
1084                        userlog("Failed to malloc ndrx_svq_ev_t: %s", 
1085                                strerror(err));
1086                        EXFAIL_OUT(ret);
1087                    }
1088                    
1089                    ev->data = NULL;
1090                    ev->datalen = 0;
1091                    ev->ev = NDRX_SVQ_EV_FD;
1092                    ev->fd = fdtabmo_tmp[i].fd;
1093                    ev->revents = fdtabmo_tmp[i].revents;
1094                    ev->prev = NULL;
1095                    ev->next = NULL;
1096                    /* get queue descriptor  
1097                     * the data is deallocated by target thread
1098                     */
1099                    ret = ndrx_svq_mqd_put_event(fdh->mqd, &ev);
1100                    
1101                    if (NULL!=ev)
1102                    {
1103                        NDRX_FPFREE(ev);
1104                    }
1105                    
1106                    if (EXSUCCEED!=ret)
1107                    {
1108                        NDRX_LOG(log_error, "Failed to put FD event for %p - FAIL", 
1109                                fdh->mqd);
1110                        EXFAIL_OUT(ret);
1111                    }
1112                    
1113                    /* At this point we reset the fd handler... */
1114                    syncfd = EXFALSE;
1115                 }
1116             } /* if got revents */
1117         } /* for events... */
1118     } /* while not shutdown... */
1119     
1120 out:
1121     
1122     MUTEX_LOCK_V(M_mon_lock_fd);
1123     if (NULL!=M_mon.fdtab)
1124     {
1125         NDRX_FREE(M_mon.fdtab);
1126     }
1127 
1128     if (NULL!=M_mon.fdtabmo)
1129     {
1130         NDRX_FREE(M_mon.fdtabmo);
1131     }
1132 
1133     MUTEX_UNLOCK_V(M_mon_lock_fd);
1134 
1135     if (NULL!=fdtabmo_tmp)
1136     {
1137         NDRX_FPFREE(fdtabmo_tmp);
1138     }
1139 
1140     /* if we get "unknown" error here, then we have to shutdown the whole app
1141      * nothing todo, as we are totally corrupted
1142      */
1143     
1144     M_alive = EXFALSE;
1145 
1146     if (EXSUCCEED!=ret)
1147     {
1148         NDRX_LOG(log_error, "System V Queue monitoring thread faced "
1149                 "unhandled error - terminate!");
1150         userlog("System V Queue monitoring thread faced unhandled error - terminate!");
1151         exit(ret);
1152     }
1153     
1154     return NULL;
1155 }
1156 
1157 /**
1158  * Terminate the poller thread
1159  */
1160 expublic void ndrx_svq_event_exit(int detach)
1161 {
1162     NDRX_LOG(log_debug, "Terminating event thread...");
1163     
1164     if (M_alive)
1165     {
1166         ndrx_svq_moncmd_term();
1167         
1168         /* if equals to 0 then threads are not the same...  */
1169         if (0==pthread_equal(pthread_self(), M_mon.evthread))
1170         {
1171             NDRX_LOG(log_debug, "Join evthread...");
1172             pthread_join(M_mon.evthread, NULL);
1173             NDRX_LOG(log_debug, "Join evthread... (done)");
1174         }
1175     }
1176     
1177     if (detach)
1178     {
1179         /* detach resources */
1180         ndrx_svqshm_detach();
1181     }
1182 }
1183 
1184 /**
1185  * Terminate resources..
1186  */
1187 expublic void ndrx_svq_event_atexit(void)
1188 {
1189     ndrx_svq_event_exit(EXTRUE);
1190 }
1191 
1192 /**
1193  * Prepare event thread for forking
1194  * This will terminate the event thread
1195  */
1196 expublic void ndrx_svq_fork_prepare(void)
1197 {
1198     NDRX_LOG(log_debug, "Preparing System V Aux thread for fork");
1199     
1200     if (EXFAIL==M_mon.evpipe[READ] && EXFAIL==M_mon.evpipe[WRITE])
1201     {
1202         NDRX_LOG(log_debug, "evpipe not open -> nothing to close");
1203         goto out;
1204     }
1205     
1206     ndrx_svq_event_exit(EXFALSE);
1207     
1208     /* Close pipes */
1209     if (EXSUCCEED!=close(M_mon.evpipe[READ]))
1210     {
1211         NDRX_LOG(log_error, "Failed to close READ PIPE %d: %s",
1212                 M_mon.evpipe[READ], strerror(errno));
1213     }
1214     else
1215     {
1216         M_mon.evpipe[READ] = EXFAIL;
1217     }
1218     
1219     if (EXSUCCEED!=close(M_mon.evpipe[WRITE]))
1220     {
1221         NDRX_LOG(log_error, "Failed to close WRITE PIPE %d: %s",
1222                 M_mon.evpipe[WRITE], strerror(errno));
1223     }
1224     else
1225     {
1226         M_mon.evpipe[WRITE] = EXFAIL;
1227     }
1228     
1229     /* Lock the reference list to avoid partially locked by other threads
1230      * in child process
1231      
1232     ndrx_svq_delref_lock();
1233     */
1234     
1235 out:
1236     return;
1237 }
1238 
1239 /**
1240  * Child resume after forking
1241  */
1242 exprivate void ndrx_svq_fork_resume_child(void)
1243 {
1244     /* ndrx_svq_delref_unlock(); */
1245 }
1246 
1247 /**
1248  * Resume after fork 
1249  */
1250 exprivate void ndrx_svq_fork_resume(void)
1251 {
1252     int err;
1253     int ret=EXSUCCEED;
1254     pthread_attr_t pthread_custom_attr;
1255     
1256     NDRX_LOG(log_debug, "Restoring System V Aux thread after fork %d", (int)getpid());
1257     
1258     
1259     /* ndrx_svq_delref_unlock(); */
1260     
1261     /* create pipes */
1262     /* O_NONBLOCK */
1263     if (EXFAIL==pipe(M_mon.evpipe))
1264     {
1265         err = errno;
1266         NDRX_LOG(log_error, "pipe failed: %s", strerror(err));
1267         EXFAIL_OUT(ret);
1268     }
1269 
1270     if (EXFAIL==fcntl(M_mon.evpipe[READ], F_SETFL, 
1271                 fcntl(M_mon.evpipe[READ], F_GETFL) | O_NONBLOCK))
1272     {
1273         err = errno;
1274         NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed: %s", 
1275                 strerror(err));
1276         EXFAIL_OUT(ret);
1277     }
1278 
1279     if (NULL==(M_mon.fdtab = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1280     {
1281         err = errno;
1282         NDRX_LOG(log_error, "calloc for pollfd failed: %s", strerror(err));
1283         EXFAIL_OUT(ret);
1284     }
1285     
1286     if (NULL==(M_mon.fdtabmo = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1287     {
1288         err = errno;
1289         NDRX_LOG(log_error, "calloc for pollfd mo failed: %s", strerror(err));
1290         EXFAIL_OUT(ret);
1291     }
1292     
1293     /* create thread */
1294     
1295     /* So wait for events here in the pip form Q thread */
1296     M_mon.fdtab[PIPE_POLL_IDX].fd = M_mon.evpipe[READ];
1297     M_mon.fdtab[PIPE_POLL_IDX].events = POLLIN;
1298     
1299     M_mon.nrfds = 1; /* initially only pipe wait */
1300     
1301     /* startup tup the thread */
1302     NDRX_LOG(log_debug, "System V Monitoring pipes fd read:%d write:%d",
1303                             M_mon.evpipe[READ], M_mon.evpipe[WRITE]);
1304     
1305     pthread_attr_init(&pthread_custom_attr);
1306     ndrx_platf_stack_set(&pthread_custom_attr);
1307     M_alive=EXTRUE;
1308     if (EXSUCCEED!=(ret=pthread_create(&(M_mon.evthread), &pthread_custom_attr, 
1309             ndrx_svq_timeout_thread, NULL)))
1310     {
1311         M_alive=EXFALSE;
1312         
1313         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "SystemV Event thread");
1314         
1315         EXFAIL_OUT(ret);
1316     }
1317     
1318 out:
1319                             
1320     if (EXSUCCEED!=ret)
1321     {
1322         NDRX_LOG(log_error, "System V AUX thread resume after fork failed - abort!");
1323         userlog("System V AUX thread resume after fork failed - abort!");
1324         /*abort();*/
1325         exit(EXFAIL); 
1326     }
1327 
1328 }
1329 
1330 /**
1331  * Set the timeout-thread scan unit
1332  * @param ms milliseconds to set
1333  */
1334 expublic int ndrx_svq_scanunit_set(int ms)
1335 {
1336     int prev_val = M_scanunit;
1337     
1338     M_scanunit_was_init=EXTRUE;
1339     M_scanunit=ms;
1340     
1341     return prev_val;
1342 }
1343 
1344 /**
1345  * Setup basics for Event handling for System V queues
1346  * @return EXSUCCEED/EXFAIL
1347  */
1348 expublic int ndrx_svq_event_init(void)
1349 {
1350     int err;
1351     int ret = EXSUCCEED;
1352     static int first = EXTRUE;
1353     pthread_attr_t pthread_custom_attr;
1354    /* 
1355     * Signal handling 
1356     */
1357     struct sigaction act;
1358     
1359     NDRX_LOG(log_debug, "About to init System V Eventing sub-system");
1360     memset(&act, 0, sizeof(act));
1361     /* define the signal action */
1362 
1363     /* make "act" an empty signal set */
1364     sigemptyset(&act.sa_mask);
1365 
1366     act.sa_handler = ndrx_svq_signal_action;
1367     act.sa_flags = SA_NODEFER;
1368 
1369     if (EXSUCCEED!=sigaction(NDRX_SVQ_SIG, &act, 0))
1370     {
1371         err = errno;
1372         NDRX_LOG(log_error, "Failed to init sigaction: %s" ,strerror(err));
1373         EXFAIL_OUT(ret);
1374     }
1375     
1376     /* if ndrxd applied the setting, do not use the env anymore. */
1377     if (first)
1378     {
1379         if (!M_scanunit_was_init)
1380         {
1381             char *p = getenv(CONF_NDRX_SCANUNIT);
1382 
1383             if (NULL!=p)
1384             {
1385                 if (!ndrx_is_numberic(p))
1386                 {
1387                     NDRX_LOG(log_error, "ERROR ! %s is not number! Got [%s]", 
1388                             CONF_NDRX_SCANUNIT, p);
1389 
1390                     userlog("ERROR ! %s is not number! Got [%s]", 
1391                             CONF_NDRX_SCANUNIT, p);
1392                     EXFAIL_OUT(ret);
1393                 }
1394 
1395                 M_scanunit = atoi(p);
1396 
1397                 if (M_scanunit < 1)
1398                 {
1399                     NDRX_LOG(log_error, "ERROR ! %s is than min %d ms! Got %d", 
1400                             CONF_NDRX_SCANUNIT, CONF_NDRX_SCANUNIT_MIN, M_scanunit);
1401 
1402                     userlog("ERROR ! %s is than min %d ms! Got %d", 
1403                             CONF_NDRX_SCANUNIT, CONF_NDRX_SCANUNIT_MIN, M_scanunit);
1404                     EXFAIL_OUT(ret);
1405                 }
1406             }
1407         }
1408         NDRX_LOG(log_info, "System-V Scan unit set to %d ms", M_scanunit);
1409     }
1410 
1411     /* At this moment we need to bootstrap a timeout monitoring thread.. 
1412      * the stack size of this thread could be small one because it
1413      * will mostly work with dynamically allocated memory..
1414      * need to open unnamed pipe too for incoming commands to thread.
1415      */
1416     memset(&M_mon, 0, sizeof(M_mon));
1417     
1418     /* O_NONBLOCK */
1419     if (EXFAIL==pipe(M_mon.evpipe))
1420     {
1421         err = errno;
1422         NDRX_LOG(log_error, "pipe failed: %s", strerror(err));
1423         EXFAIL_OUT(ret);
1424     }
1425 
1426     if (EXFAIL==fcntl(M_mon.evpipe[READ], F_SETFL, 
1427                 fcntl(M_mon.evpipe[READ], F_GETFL) | O_NONBLOCK))
1428     {
1429         err = errno;
1430         NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed: %s", 
1431                 strerror(err));
1432         EXFAIL_OUT(ret);
1433     }
1434 
1435     M_mon.nrfds = 1; /* initially only pipe wait */
1436     if (NULL==(M_mon.fdtab = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1437     {
1438         err = errno;
1439         NDRX_LOG(log_error, "calloc for pollfd failed: %s", strerror(err));
1440         EXFAIL_OUT(ret);
1441     }
1442     
1443     if (NULL==(M_mon.fdtabmo = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1444     {
1445         err = errno;
1446         NDRX_LOG(log_error, "calloc for pollfd mo failed: %s", strerror(err));
1447         EXFAIL_OUT(ret);
1448     }
1449 
1450     /* So wait for events here in the pip form Q thread */
1451     M_mon.fdtab[PIPE_POLL_IDX].fd = M_mon.evpipe[READ];
1452     M_mon.fdtab[PIPE_POLL_IDX].events = POLLIN;
1453     
1454     /* startup up the thread */
1455     NDRX_LOG(log_debug, "System V Monitoring pipes fd read:%d write:%d",
1456                             M_mon.evpipe[READ], M_mon.evpipe[WRITE]);
1457     
1458     pthread_attr_init(&pthread_custom_attr);
1459     ndrx_platf_stack_set(&pthread_custom_attr);
1460     
1461     M_alive=EXTRUE;
1462     if (EXSUCCEED!=(ret=pthread_create(&(M_mon.evthread), &pthread_custom_attr, 
1463         ndrx_svq_timeout_thread, NULL)))
1464     {
1465         M_alive=EXFALSE;
1466         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "SystemV Event thread");
1467         EXFAIL_OUT(ret);
1468     }
1469     
1470     /* register fork handlers */
1471         
1472     if (first)
1473     {
1474 #if 0
1475         - OS will remove all the resources used.
1476           this code causes core dumps, if some process performs exit() in the middle
1477           of processing.
1478         /* have exit handler */
1479         if (EXSUCCEED!=atexit(ndrx_svq_event_atexit))
1480         {
1481             err = errno;
1482             NDRX_LOG(log_error, "Failed to register ndrx_svq_event_exit with atexit(): %s",
1483                     strerror(err));
1484             userlog("Failed to register ndrx_svq_event_exit with atexit(): %s",
1485                     strerror(err));
1486             EXFAIL_OUT(ret);
1487         }
1488 #endif
1489     
1490         if (EXSUCCEED!=(ret=ndrx_atfork(ndrx_svq_fork_prepare, 
1491                 /* no need for child resume! */
1492                 ndrx_svq_fork_resume, ndrx_svq_fork_resume_child)))
1493         {
1494             M_alive=EXFALSE;
1495             NDRX_LOG(log_error, "Failed to register fork handlers: %s", 
1496                     strerror(ret));
1497             userlog("Failed to register fork handlers: %s", strerror(ret));
1498             EXFAIL_OUT(ret);
1499         }
1500         /* after xadmin un-inits we might get some extra threads...
1501          * due to multiple calls... */
1502         first = EXFALSE;
1503     }
1504 
1505 out:
1506     errno = err;
1507     return ret;
1508 }
1509 
1510 /**
1511  * Submit monitor command
1512  * @param cmd filled command block
1513  * @return EXSUCCEED/EXFAIL
1514  */
1515 exprivate int ndrx_svq_moncmd_send(ndrx_svq_mon_cmd_t *cmd)
1516 {
1517     int ret = EXSUCCEED;
1518     int err = 0;
1519     
1520     if (M_mon.evpipe[WRITE] != EXFAIL)
1521     {
1522         if (EXFAIL==write (M_mon.evpipe[WRITE], (char *)cmd, 
1523                 sizeof(ndrx_svq_mon_cmd_t)))
1524         {
1525             err = errno;
1526             NDRX_LOG(log_error, "Error ! write fail: %s", strerror(err));
1527             userlog("Error ! write fail: %s", strerror(errno));
1528             EXFAIL_OUT(ret);
1529         }
1530     }
1531     else
1532     {
1533         NDRX_LOG(log_info, "No event thread -> pipe closed.");
1534     }
1535     
1536 out:
1537     errno = err;
1538     return ret;
1539 }
1540 
1541 /**
1542  * register timeout for the monitor
1543  * @param mqd message queue ptr
1544  * @param stamp_time timestamp
1545  * @param stamp_seq sequence number
1546  * @param abs_timeout absolute timestamp
1547  * @param syncfd monitor the file descriptors
1548  * @return EXSUCCEED/EXFAIL
1549  */
1550 expublic int ndrx_svq_moncmd_tout(mqd_t mqd, ndrx_stopwatch_t *stamp_time, 
1551         unsigned long stamp_seq, struct timespec *abs_timeout, int syncfd)
1552 {
1553     ndrx_svq_mon_cmd_t cmd;
1554     
1555     memset(&cmd, 0, sizeof(cmd));
1556     
1557     cmd.cmd = NDRX_SVQ_MON_TOUT;
1558     cmd.mqd = mqd;
1559     cmd.stamp_time = *stamp_time;
1560     cmd.stamp_seq = stamp_seq;
1561     cmd.abs_timeout = *abs_timeout;
1562     
1563     if (syncfd)
1564     {
1565         cmd.flags|=NDRX_SVQ_MONF_SYNCFD;
1566     }
1567     
1568     return ndrx_svq_moncmd_send(&cmd);
1569 }
1570 
1571 /**
1572  * Add file descriptor to event monitor
1573  * @param mqd message queue ptr
1574  * @param fd file descriptor
1575  * @param[in] events to poll for
1576  * @return EXSUCCEED/EXFAIL
1577  */
1578 expublic int ndrx_svq_moncmd_addfd(mqd_t mqd, int fd, uint32_t events)
1579 {
1580     ndrx_svq_mon_cmd_t cmd;
1581     
1582     memset(&cmd, 0, sizeof(cmd));
1583     
1584     cmd.cmd = NDRX_SVQ_MON_ADDFD;
1585     cmd.mqd = mqd;
1586     cmd.fd = fd;
1587     cmd.events = events;
1588     
1589     return ndrx_svq_moncmd_send(&cmd);
1590 }
1591 
1592 /**
1593  * Remove file descriptor.
1594  * @param fd file descriptor to remove from monitor
1595  * @return EXSUCCEED/EXFAIL
1596  */
1597 expublic int ndrx_svq_moncmd_rmfd(int fd)
1598 {
1599     ndrx_svq_mon_cmd_t cmd;
1600     
1601     memset(&cmd, 0, sizeof(cmd));
1602     
1603     cmd.cmd = NDRX_SVQ_MON_RMFD;
1604     cmd.fd = fd;
1605     
1606     return ndrx_svq_moncmd_send(&cmd);
1607 }
1608 
1609 /**
1610  * Terminate the monitor thread
1611  * @return EXSUCCEED/EXFAIL
1612  */
1613 expublic int ndrx_svq_moncmd_term(void)
1614 {
1615     ndrx_svq_mon_cmd_t cmd;
1616     int ret;
1617     
1618     memset(&cmd, 0, sizeof(cmd));
1619     
1620     cmd.cmd = NDRX_SVQ_MON_TERM;
1621     
1622     ret=ndrx_svq_moncmd_send(&cmd);
1623     
1624     /* wait for thread to kill up */
1625     
1626     return ret;
1627 }
1628 
1629 /**
1630  * Send/Receive data with timeout and other events option
1631  * @param mqd queue descriptor (already validated)
1632  * @param ptr data ptr 
1633  * @param maxlen buffer size for receive. For sending it is data len
1634  * @param abs_timeout absolute timeout passed to posix q
1635  * @param ev if event received, then pointer is set to dequeued event.
1636  * @param p_ptr allocate data buffer for NDRX_SVQ_EV_DATA event.
1637  * @param is_send do we send? if EXFALSE, we do receive
1638  * @param[in] syncfd Tell poller thread that we start to wait for FD events
1639  *  otherwise sync thread won't monitor FDs, due to fact that while we are
1640  *  not processed the FD, we will gate again FD wakups for the same events.
1641  * @return EXSUCCEED (got message from q)/EXFAIL (some event received)
1642  */
1643 expublic int ndrx_svq_event_sndrcv(mqd_t mqd, char *ptr, ssize_t *maxlen, 
1644         struct timespec *abs_timeout, ndrx_svq_ev_t **ev, int is_send, int syncfd)
1645 {
1646     int ret = EXSUCCEED;
1647     int err=0;
1648     int msgflg;
1649     int len;
1650     ndrx_svq_ev_t cur_stamp;
1651     
1652     /* set the flag value */
1653     if (mqd->attr.mq_flags & O_NONBLOCK)
1654     {
1655         NDRX_LOG(log_debug, "O_NONBLOCK set");
1656         msgflg = IPC_NOWAIT;
1657     }
1658     else
1659     {
1660         msgflg = 0;
1661     }
1662     
1663     *ev = NULL; /* no event... */
1664     len = *maxlen;
1665     /* to avoid the deadlocks, we shall register the timeout here...
1666      * if we do it after the "rcvlockb4" then we might 
1667      * but then if we got any applied event, we shall change the
1668      * timestamp and sequence as the timeout request is not more
1669      * valid and shall be discarded next time...
1670      * or just do no thing, as on next entry we will update the stamp
1671      * and the processed one will be invalid
1672      */
1673     
1674     /* update time stamps */
1675     ndrx_stopwatch_reset(&(mqd->stamp_time));
1676     
1677     NDRX_SPIN_LOCK_V((mqd->stamplock));
1678     
1679     mqd->stamp_seq++;
1680     cur_stamp.stamp_seq = mqd->stamp_seq;
1681     
1682     memcpy(&(mqd->stamp_time), &cur_stamp.stamp_time, sizeof(cur_stamp.stamp_time));
1683     
1684     NDRX_SPIN_UNLOCK_V(mqd->stamplock);
1685     
1686     /* register timeout: */
1687 
1688     /* if abs timeout is set to zero, then there is no timeout expected.. */
1689     NDRX_LOG(log_debug, "timeout tv_sec=%ld tv_nsec=%ld", 
1690         (long)abs_timeout->tv_sec, (long)abs_timeout->tv_nsec);
1691     
1692     if (0!=abs_timeout->tv_nsec || 0!=abs_timeout->tv_sec || syncfd)
1693     {
1694         /* one FD is used by internal pipe, thus we are interested in
1695          * polled fds added by Enduro/X API
1696          * TODO: Might consider to move syncfd after existing event Q is processed
1697          * otherwise if some FD triggered events, they might be in Q, but
1698          * with this syncfd call we activate generations of the events again (thus
1699          * we get duplication...).
1700          * Can consider to process the event Q twice, before syncfd
1701          * and after the syncfd (anyway those are mem ops only). This
1702          * can reduce the possiblity of twice processing.
1703          */
1704         if (syncfd)
1705         {
1706             /* send via long channel -> pipe */
1707             if (EXSUCCEED!=ndrx_svq_moncmd_tout(mqd, &(mqd->stamp_time), mqd->stamp_seq, 
1708                 abs_timeout, syncfd))
1709             {
1710                 err = EFAULT;
1711                 NDRX_LOG(log_error, "Failed to request timeout to ndrx_svq_moncmd_tout()");
1712                 userlog("Failed to request timeout to ndrx_svq_moncmd_tout()");
1713                 EXFAIL_OUT(ret);
1714             }
1715         }
1716         else
1717         {
1718             /* set-time out directly in the hash list
1719              * so that event thread checks it every sec...
1720              */
1721             if (EXSUCCEED!=ndrx_svq_mqd_hash_add(mqd, &(mqd->stamp_time), mqd->stamp_seq, 
1722                 abs_timeout))
1723             {
1724                 err = EFAULT;
1725                 NDRX_LOG(log_error, "Failed to request timeout to ndrx_svq_mqd_hash_add()");
1726                 userlog("Failed to request timeout to ndrx_svq_mqd_hash_add()");
1727                 EXFAIL_OUT(ret);
1728             }
1729         }
1730     }
1731     else
1732     {
1733         NDRX_LOG(log_debug, "timeout not needed... mqd=%p", mqd);
1734     }
1735     
1736     /* Also we shall here check the queue? */
1737 
1738     /* Lock here before process... */
1739     NDRX_SPIN_LOCK_V((mqd->rcvlockb4));
1740 
1741     /* We need pre-read lock...? as it is flushing the queue
1742      * but we assume that it will still process the queue...! */
1743     
1744     /* lock queue  */
1745     MUTEX_LOCK_V(mqd->qlock);
1746 
1747     /* process incoming events... if any queued... 
1748      * if have any event, interrupt the waiting and return back to caller
1749      * Also check that event is relevant for us -> i.e timestamps matches...
1750      */
1751     
1752     /* Check the events matching the current time stamp, ignore
1753      * events sent not four our stamp
1754      * TODO: Test slow shutdown...! I.e. server process is busy doing something
1755      * and we enqueue a shutdown, will it actually shutdown?
1756      */
1757     while (NULL!=mqd->eventq &&
1758             NDRX_SVQ_EV_TOUT==mqd->eventq->ev && 
1759             !(NDRX_SVQ_TOUT_MATCH((mqd->eventq), (&cur_stamp))))
1760     {
1761         /* Remove any pending event, not relevant to our position */
1762         *ev = mqd->eventq;
1763         DL_DELETE(mqd->eventq, *ev);
1764         NDRX_FPFREE(*ev);
1765         *ev = NULL;
1766     }
1767     
1768     if (NULL!=mqd->eventq)
1769     {    
1770         *ev = mqd->eventq;
1771         DL_DELETE(mqd->eventq, *ev);
1772         MUTEX_UNLOCK_V(mqd->qlock);
1773         NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
1774         
1775         NDRX_LOG(log_info, "Got event in q %p: %d", mqd, (*ev)->ev);
1776         EXFAIL_OUT(ret);
1777     }
1778     
1779     
1780     /* Chain the lockings, so that Q lock would wait until both
1781      *  are locked
1782      */
1783     /* set thread id.. */
1784     mqd->thread = pthread_self();
1785     M_signalled = EXFALSE;
1786     /* here is no interrupt, as pthread locks are imune to signals */
1787     NDRX_SPIN_LOCK_V((mqd->rcvlock));    
1788     /* unlock queue  */
1789     MUTEX_UNLOCK_V(mqd->qlock);
1790 
1791     /* So we will do following by tout thread.
1792      * Lock Q, put msgs inside unlock
1793      * if rcvlockb4 && rcvlock are locked, then send kill signals
1794      * to process...
1795      * send until both are unlocked or stamp is changed.
1796      */
1797     if (!M_signalled)
1798     {
1799 
1800         if (is_send)
1801         {
1802             ret=msgsnd (mqd->qid, ptr, NDRX_SVQ_INLEN(len), msgflg);
1803         }
1804         else
1805         {
1806             ret=msgrcv (mqd->qid, ptr, NDRX_SVQ_INLEN(len), 0, msgflg);
1807         }
1808 
1809         if (EXFAIL==ret)
1810         {
1811             err=errno;
1812         }
1813     }
1814     else
1815     {
1816         /* OK, we are signaled, lets fail here! can cause zero length msgs
1817          * to be received, thus needs to have status!
1818          */
1819         ret=EXFAIL;
1820         err=EINTR;
1821     }
1822 
1823     /* TODO: Replace these two bellow with spin locks
1824      * so that we are sure that we do not get any signals on them... */
1825     NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
1826     NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
1827         
1828     MUTEX_LOCK_V(mqd->barrier);
1829     MUTEX_UNLOCK_V(mqd->barrier);
1830 
1831     /* if have message return it first..  */
1832     if (EXFAIL!=ret)
1833     {
1834         /* pthread_mutex_unlock(&(mqd->qlock)); */
1835         
1836         /* OK, we got a message from queue */
1837         if (!is_send)
1838         {
1839             *maxlen = NDRX_SVQ_OUTLEN(ret);
1840         }
1841             
1842         ret=EXSUCCEED;
1843         goto out;
1844     }
1845     else
1846     {
1847         if (EINTR!=err)
1848         {
1849             int lev = log_error;
1850             
1851             if (ENOMSG==err)
1852             {
1853                 lev=log_debug;
1854             }
1855             NDRX_LOG(lev, "MQ op fail qid:%d len:%d msgflg: %d: %s", 
1856                 mqd->qid, len, msgflg, strerror(err));
1857             EXFAIL_OUT(ret);
1858         }
1859         else
1860         {
1861             NDRX_LOG(log_debug, "(EINTR) MQ op fail qid:%d len:%d msgflg: %d: %s", 
1862                 mqd->qid, len, msgflg, strerror(err));
1863         }
1864     }
1865     
1866     
1867     ret=EXSUCCEED;
1868     
1869     /* lock queue  */
1870     MUTEX_LOCK_V(mqd->qlock);
1871 
1872     /* Zap any expired timeouts... */
1873     while (NULL!=mqd->eventq &&
1874                 NDRX_SVQ_EV_TOUT==mqd->eventq->ev && 
1875                 !(NDRX_SVQ_TOUT_MATCH((mqd->eventq), (&cur_stamp))))
1876     {
1877         /* Remove any pending event, not relevant to our position */
1878         *ev = mqd->eventq;
1879         DL_DELETE(mqd->eventq, *ev);
1880         NDRX_FPFREE(*ev);
1881         *ev = NULL;
1882     }
1883 
1884     /* if have event queued, return it second */
1885     if (NULL!=mqd->eventq)
1886     {
1887         *ev = mqd->eventq;
1888         DL_DELETE(mqd->eventq, *ev);
1889         MUTEX_UNLOCK_V(mqd->qlock);
1890 
1891         NDRX_LOG(log_info, "Got event in q %p: %d", mqd, (*ev)->ev);
1892 
1893         /* failed to receive, got event! */
1894         EXFAIL_OUT(ret);
1895     }
1896 
1897     /* unlock queue   */
1898     MUTEX_UNLOCK_V(mqd->qlock);
1899     
1900 out:
1901     
1902     /* Remove queue from hash... no problem if it was already removed... */
1903     ndrx_svq_mqd_hash_del(mqd);
1904 
1905     /* seems if we get real signal in the thread
1906      * and there is no event waiting, then return fail
1907      * Bug #537
1908      */
1909     if (EXSUCCEED==ret && NULL==*ev && EINTR==err)
1910     {
1911         NDRX_LOG(log_error, "Interrupted by external signal, M_signalled=%d", M_signalled);
1912         ret=EXFAIL;
1913     }
1914 
1915     errno = err;
1916 
1917     return ret;
1918 }
1919 
1920 
1921 /* vim: set ts=4 sw=4 et smartindent: */