Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Emulated message queue. Based on UNIX Network Programming
0003  *  Volume 2 Second Edition interprocess Communications by W. Richard Stevens
0004  *  book. This code is only used for MacOS, as there aren't any reasonable
0005  *  queues available.
0006  *
0007  * @file sys_emqueue.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 
0038 /*---------------------------Includes-----------------------------------*/
0039 #include <stdlib.h>
0040 #include <stdio.h>
0041 #include <stdarg.h>
0042 #include <sys/types.h>
0043 #include <sys/stat.h>
0044 #include <fcntl.h>
0045 #include <errno.h>
0046 #include <string.h>
0047 #include <sys/time.h>
0048 
0049 #include <ndrstandard.h>
0050 #include "sys_emqueue.h"
0051 #include "sys_unix.h"
0052 #include "ndebug.h"
0053 
0054 #include <stddef.h>
0055 #include <errno.h>
0056 #include <fcntl.h>
0057 #include <unistd.h>
0058 #include <stdio.h>
0059 #include <sys/mman.h>
0060 #include <signal.h>
0061 #include <thlock.h>
0062 #include <limits.h>
0063 #include <exhash.h>
0064 #include <nstopwatch.h>
0065 #include <nstd_tls.h>
0066 
0067 /*---------------------------Externs------------------------------------*/
0068 /*---------------------------Macros-------------------------------------*/
0069 #define  LOCK_Q if ( (n = pthread_mutex_lock(&emqhdr->emqh_lock)) != 0)\
0070     {\
0071             NDRX_LOG(log_error, "EMQ: pthread_mutex_lock failed: %s", strerror(n));\
0072             userlog("EMQ: pthread_mutex_lock failed: %s", strerror(n));\
0073             errno = n;\
0074             return EXFAIL;\
0075     }
0076     
0077 #define MAX_TRIES   10
0078 /*---------------------------Enums--------------------------------------*/
0079 /*---------------------------Typedefs-----------------------------------*/
0080 struct qd_hash
0081 {
0082     void *qd;
0083     EX_hash_handle hh; /* makes this structure hashable        */
0084 };
0085 typedef struct qd_hash qd_hash_t;
0086 /*---------------------------Globals------------------------------------*/
0087 /*---------------------------Statics------------------------------------*/
0088 exprivate  struct mq_attr defattr = { 0, 128, 1024, 0 };
0089 exprivate MUTEX_LOCKDECL(M_lock);
0090 exprivate qd_hash_t *M_qd_hash = NULL;
0091 
0092 /*---------------------------Prototypes---------------------------------*/
0093 
0094 /*
0095  * For darwin https://www.endurox.org/issues/512
0096  * There is issue in case if PTHREAD_PROCESS_SHARED mutex & conditional variables are used
0097  * between different processes.
0098  * 
0099  * See https://github.com/apple/darwin-libpthread/blob/master/src/pthread_cond.c
0100  * 
0101  * Problem is that pthread_cond_wait() of Darwin internally compares some saved
0102  * mutex ptr in ocond variable.
0103  *
0104  * And thing is that mmap may return differ addresses (e.g. address randomization).
0105  * Thus cond and mutex for each process may have it's own pointer.
0106  * 
0107  * Thus it may not match with previously set cond->busy by other process, thus may get EINVAL.
0108  * According to http://openradar.appspot.com/19600706 only option is to access
0109  * internal variable 'busy' and reset it to NULL. Probably only working case here
0110  * is if process performs forks. But in our case processes which are using shared
0111  * mutex & conds are completely different processes.
0112  * 
0113  * Though there could be race conditions too, if several processes performs
0114  * _pthread_cond_wait(), one before call may set busy to NULL, while other
0115  * may already set it to mutex. Thus we will attempt some 100 times with
0116  * random sleeps in case of EINVAL, and only after 100 we give up.
0117  * 
0118  */
0119 #ifdef EX_OS_DARWIN
0120 
0121 /**
0122  * Wrapper for OSX limitation work around
0123  * Reset internals before call. Fight with race conditions
0124  */
0125 expublic int ndrx_pthread_cond_timedwait(pthread_cond_t *restrict cond, 
0126         pthread_mutex_t *restrict mutex, 
0127         const struct timespec *restrict abstime)
0128 {
0129     int attempt = 0;
0130     
0131     int ret;
0132     ndrx_osx_pthread_cond_t *p = (ndrx_osx_pthread_cond_t *)cond;
0133     do
0134     {
0135         if (attempt > 0)
0136         {
0137             /* max sleep 0.001 sec... */
0138             usleep(ndrx_rand() % 1000);
0139         }
0140         
0141         p->busy = NULL;
0142         ret = pthread_cond_timedwait(cond, mutex, abstime);
0143         
0144         attempt++;
0145         
0146     } while (EINVAL==ret && attempt < NDRX_OSX_COND_FIX_ATTEMPTS);
0147     
0148     return ret;
0149     
0150 }
0151 
0152 /**
0153  * Wrapper for OSX limitation work around
0154  * Reset internals before call. Fight with race conditions
0155  */
0156 expublic int ndrx_pthread_cond_wait(pthread_cond_t *restrict cond, 
0157         pthread_mutex_t *restrict mutex)
0158 {
0159     int attempt = 0;
0160     int ret;
0161     ndrx_osx_pthread_cond_t *p = (ndrx_osx_pthread_cond_t *)cond;
0162     do
0163     {
0164         if (attempt > 0)
0165         {
0166             /* max sleep 0.001 sec... */
0167             usleep(ndrx_rand() % 1000);
0168         }
0169         
0170         p->busy = NULL;
0171         ret = pthread_cond_wait(cond, mutex);
0172         
0173         attempt++;
0174         
0175     } while (EINVAL==ret && attempt < NDRX_OSX_COND_FIX_ATTEMPTS);
0176     
0177     return ret;
0178 }
0179 
0180 #else
0181 
0182 #define ndrx_pthread_cond_wait pthread_cond_wait
0183 #define ndrx_pthread_cond_timedwait pthread_cond_timedwait
0184 
0185 #endif
0186 
0187 
0188 /**
0189  * Add queue descriptor to hash
0190  * @param q
0191  * @return 
0192  */
0193 exprivate int qd_exhash_add(mqd_t q)
0194 {
0195     int ret = EXSUCCEED;
0196     qd_hash_t * el = NDRX_FPMALLOC(sizeof(qd_hash_t), 0);
0197     
0198     NDRX_LOG(log_dump, "Registering %p as mqd_t", q);
0199     if (NULL==el)
0200     {
0201         NDRX_LOG(log_error, "Failed to alloc: %s", strerror(errno));
0202         EXFAIL_OUT(ret);
0203     }
0204     
0205     el->qd  = (void *)q;
0206     
0207     MUTEX_LOCK_V(M_lock);
0208     
0209     EXHASH_ADD_PTR(M_qd_hash, qd, el);
0210     NDRX_LOG(log_dump, "added...");
0211     
0212     MUTEX_UNLOCK_V(M_lock);
0213     
0214 out:
0215 
0216     return ret;
0217 }
0218 
0219 /**
0220  * Check is queue registered
0221  * @param q
0222  * @return 
0223  */
0224 exprivate int qd_hash_chk(mqd_t qd)
0225 {
0226     qd_hash_t *ret = NULL;
0227     
0228     NDRX_LOG(log_dump, "checking qd %p", qd);
0229     
0230     MUTEX_LOCK_V(M_lock);
0231     
0232     EXHASH_FIND_PTR( M_qd_hash, ((void **)&qd), ret);
0233     
0234     MUTEX_UNLOCK_V(M_lock);
0235     
0236     if (NULL!=ret)
0237     {
0238         return EXTRUE;
0239     }
0240     else
0241     {
0242         return EXFALSE;
0243     }
0244 }
0245 
0246 /**
0247  * Check is 
0248  * @param q
0249  * @return 
0250  */
0251 exprivate void qd_hash_del(mqd_t qd)
0252 {
0253     qd_hash_t *ret = NULL;
0254     
0255     NDRX_LOG(log_dump, "Unregistering %p as mqd_t", qd);
0256     
0257     MUTEX_LOCK_V(M_lock);
0258     EXHASH_FIND_PTR( M_qd_hash, ((void **)&qd), ret);
0259     
0260     if (NULL!=ret)
0261     {
0262         EXHASH_DEL(M_qd_hash, ret);
0263         NDRX_FPFREE(ret);
0264     }
0265     
0266     MUTEX_UNLOCK_V(M_lock);
0267 }
0268 
0269 
0270 /**
0271  * Get he queue path
0272  * @param path
0273  * @param[out] bufout where to copy the output path string 
0274  * @param[out] bufoutsz output buffer size
0275  * @return ptr to output buffer
0276  */
0277 static char *get_path(const char *path, char *bufout, size_t bufoutsz)
0278 {
0279     static int first = 1;
0280     static char q_path[PATH_MAX]={EXEOS};
0281     
0282     if (first)
0283     {
0284         char *p;
0285         if (NULL!=(p=getenv(CONF_NDRX_QPATH)))
0286         {
0287             NDRX_STRCPY_SAFE(q_path, p);
0288         }
0289         
0290         first = 0;
0291     }
0292     
0293     NDRX_STRCPY_SAFE_DST(bufout, q_path, bufoutsz);
0294     NDRX_STRCAT_S(bufout, bufoutsz, path);
0295 
0296     return bufout;
0297 }
0298 
0299 /**
0300  * Close message queue
0301  * @param emqd queue dsc
0302  * @return  standard queue errors
0303  */
0304 expublic int emq_close(mqd_t emqd)
0305 {
0306     long            msgsize, filesize;
0307     struct emq_hdr  *emqhdr;
0308     struct mq_attr *attr;
0309     struct emq_info *emqinfo;
0310 
0311     emqinfo = emqd;
0312     
0313     /**
0314      * Check is queue registered
0315      */
0316     if (!qd_hash_chk((mqd_t) emqd))
0317     {
0318         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0319         errno = EBADF;
0320         return EXFAIL;
0321     }
0322     
0323     emqhdr = emqinfo->emqi_hdr;
0324     attr = &emqhdr->emqh_attr;
0325 
0326     if (emq_notify(emqd, NULL) != EXSUCCEED)
0327     {
0328         return EXFAIL;
0329     }
0330 
0331     msgsize = NDRX_EMQ_MSGSIZE(attr->mq_msgsize);
0332     filesize = sizeof(struct emq_hdr) + (attr->mq_maxmsg *
0333                       (sizeof(struct emq_msg_hdr) + msgsize));
0334 
0335     NDRX_LOG(log_dump, "Before munmap()");
0336     
0337     if (munmap(emqinfo->emqi_hdr, filesize) == -1)
0338     {
0339         return EXFAIL;
0340     }
0341     
0342     NDRX_LOG(log_dump, "After munmap()");
0343 
0344     qd_hash_del(emqd);
0345     NDRX_FPFREE(emqinfo);
0346 
0347     NDRX_LOG(log_dump, "into: emq_close ret 0");
0348     
0349     return EXSUCCEED;
0350 }
0351 
0352 /**
0353  * Read message attributes
0354  */
0355 expublic int emq_getattr(mqd_t emqd, struct mq_attr *emqstat)
0356 {
0357     int             n;
0358     struct emq_hdr  *emqhdr;
0359     struct mq_attr *attr;
0360     struct emq_info *emqinfo;
0361 
0362     NDRX_LOG(log_dump, "into: emq_getattr");
0363     
0364     /**
0365      * Check is queue registered
0366      */
0367     if (!qd_hash_chk((mqd_t) emqd))
0368     {
0369         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0370         errno = EBADF;
0371         return EXFAIL;
0372     }
0373     
0374     emqinfo = emqd;
0375     emqhdr = emqinfo->emqi_hdr;
0376     attr = &emqhdr->emqh_attr;
0377     
0378     LOCK_Q;
0379 
0380     /* read queue attributes: */
0381     emqstat->mq_flags = emqinfo->emqi_flags;
0382     emqstat->mq_maxmsg = attr->mq_maxmsg;
0383     emqstat->mq_msgsize = attr->mq_msgsize;
0384     emqstat->mq_curmsgs = attr->mq_curmsgs;
0385 
0386     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0387     NDRX_LOG(log_dump, "into: emq_getattr ret 0");
0388     return EXSUCCEED;
0389 }
0390 
0391 /**
0392  * Configure notification
0393  * @param emqd
0394  * @param notification
0395  * @return 
0396  */
0397 expublic int emq_notify(mqd_t emqd, const struct sigevent *notification)
0398 {
0399     int             n;
0400     pid_t           pid;
0401     struct emq_hdr  *emqhdr;
0402     struct emq_info *emqinfo;
0403     
0404     if (!qd_hash_chk((mqd_t) emqd))
0405     {
0406         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0407         errno = EBADF;
0408         return EXFAIL;
0409     }
0410 
0411     emqinfo = emqd;
0412     emqhdr = emqinfo->emqi_hdr;
0413     
0414     LOCK_Q;
0415 
0416     pid = getpid();
0417     if (notification == NULL)
0418     {
0419         if (emqhdr->emqh_pid == pid)
0420         {
0421             emqhdr->emqh_pid = 0;
0422         }
0423     } 
0424     else 
0425     {
0426         if (emqhdr->emqh_pid != 0)
0427         {
0428             if (kill(emqhdr->emqh_pid, 0) != -1 || errno != ESRCH)
0429             {
0430                 errno = EBUSY;
0431                 goto err;
0432             }
0433         }
0434         emqhdr->emqh_pid = pid;
0435         emqhdr->emqh_event = *notification;
0436     }
0437     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0438     return EXSUCCEED;
0439 
0440 err:
0441     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0442     return EXFAIL;
0443 
0444 }
0445 
0446 /**
0447  * Open queue
0448  * @param pathname
0449  * @param oflag
0450  * @param ...
0451  * @return 
0452  */
0453 expublic mqd_t emq_open(const char *pathname, int oflag, ...)
0454 {
0455     int                  i, fd, nonblock, created, save_errno;
0456     long                 msgsize, filesize, index;
0457     va_list              ap;
0458     mode_t               mode;
0459     char                *mptr;
0460     struct emq_msg_hdr      *msghdr;
0461     struct mq_attr      *attr;
0462     struct emq_info      *emqinfo;
0463     struct stat          statbuff;
0464     struct emq_hdr       *emqhdr;
0465     char emq_x[PATH_MAX+1];
0466     pthread_mutexattr_t  mattr;
0467     pthread_condattr_t   cattr;
0468     mptr = (char *) MAP_FAILED;
0469 
0470     created = EXFALSE;
0471     nonblock = oflag & O_NONBLOCK;
0472     oflag &= ~O_NONBLOCK;
0473     emqinfo = NULL;
0474     NDRX_LOG(log_dump, "into: emq_open");
0475 
0476 
0477 again:
0478     if (oflag & O_CREAT)
0479     {
0480         va_start(ap, oflag);
0481 
0482         mode = va_arg(ap, int) & ~S_IXUSR;
0483         attr = va_arg(ap, struct mq_attr *);
0484         va_end(ap);
0485 
0486         /* Exclusive open, as only one instance shall perform init  */
0487         fd = open(get_path(pathname, emq_x, sizeof(emq_x)), 
0488                 oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
0489         
0490         if (fd < 0)
0491         {
0492             if (errno == EEXIST && (oflag & O_EXCL) == 0)
0493             {
0494                 goto exists;
0495             }
0496             else
0497             {
0498                 return((mqd_t) EXFAIL);
0499             }
0500         }
0501         
0502         created = EXTRUE;
0503         
0504         if (attr == NULL)
0505         {
0506             attr = &defattr;
0507         }
0508         else 
0509         {
0510             if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) 
0511             {
0512                 errno = EINVAL;
0513                 goto err;
0514             }
0515         }
0516         /* calculate and set the file size */
0517         msgsize = NDRX_EMQ_MSGSIZE(attr->mq_msgsize);
0518         
0519         filesize = sizeof(struct emq_hdr) + (attr->mq_maxmsg *
0520                            (sizeof(struct emq_msg_hdr) + msgsize));
0521         
0522         if (EXFAIL == lseek(fd, filesize - 1, SEEK_SET))
0523         {
0524             goto err;
0525         }
0526         
0527         if (EXFAIL == write(fd, "", 1))
0528         {
0529             goto err;
0530         }
0531 
0532         mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
0533                                     MAP_SHARED, fd, 0);
0534         if (mptr == MAP_FAILED)
0535         {
0536             goto err;
0537         }
0538 
0539         /* Queue info block allocation */
0540         if ( (emqinfo = NDRX_FPMALLOC(sizeof(struct emq_info), 0)) == NULL)
0541         {
0542             goto err;
0543         }
0544         
0545         emqinfo->emqi_hdr = emqhdr = (struct emq_hdr *) mptr;
0546         emqinfo->emqi_flags = nonblock;
0547 
0548         emqhdr->emqh_attr.mq_flags = 0;
0549         emqhdr->emqh_attr.mq_maxmsg = attr->mq_maxmsg;
0550         emqhdr->emqh_attr.mq_msgsize = attr->mq_msgsize;
0551         emqhdr->emqh_attr.mq_curmsgs = 0;
0552         emqhdr->emqh_nwait = 0;
0553         emqhdr->emqh_pid = 0;
0554         emqhdr->emqh_head = 0;
0555         index = sizeof(struct emq_hdr);
0556         emqhdr->emqh_free = index;
0557         
0558         for (i = 0; i < attr->mq_maxmsg - 1; i++)
0559         {
0560             msghdr = (struct emq_msg_hdr *) &mptr[index];
0561             index += sizeof(struct emq_msg_hdr) + msgsize;
0562             msghdr->msg_next = index;
0563         }
0564         
0565         msghdr = (struct emq_msg_hdr *) &mptr[index];
0566         /* this means, we have no next */
0567         msghdr->msg_next = 0;
0568 
0569         if ( (i = pthread_mutexattr_init(&mattr)) != 0)
0570         {
0571             goto pthreaderr;
0572         }
0573 
0574         if ((i=pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0)
0575         {
0576             NDRX_LOG(log_error, "Failed to set attribute PTHREAD_PROCESS_SHARED: %s", strerror(i));
0577             userlog("Failed to set attribute PTHREAD_PROCESS_SHARED: %s", strerror(i));
0578             goto pthreaderr;
0579         }
0580 
0581 #if defined(NDRX_MUTEX_DEBUG) || defined(EX_OS_DARWIN)
0582         if((i = pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK)) < 0)
0583         {
0584             NDRX_LOG(log_error, "Failed to set attribute ERRORCHECK: %s", strerror(i));
0585             userlog("Failed to set attribute ERRORCHECK: %s", strerror(i));
0586             goto pthreaderr;
0587         }
0588 #endif
0589 
0590         i = pthread_mutex_init(&emqhdr->emqh_lock, &mattr);
0591 
0592         pthread_mutexattr_destroy(&mattr);
0593         if (i != 0)
0594         {
0595             NDRX_LOG(log_error, "Failed to pthread_mutex_init: %s", strerror(i));
0596             userlog("Failed to pthread_mutex_init: %s", strerror(i));
0597             goto pthreaderr;
0598         }
0599 
0600         if ( (i = pthread_condattr_init(&cattr)) != 0)
0601         {
0602             goto pthreaderr;
0603         }
0604         
0605         pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
0606         i = pthread_cond_init(&emqhdr->emqh_wait, &cattr);
0607         pthread_condattr_destroy(&cattr);
0608         
0609         if (i != 0)
0610         {
0611             goto pthreaderr;
0612         }
0613 
0614         if (EXFAIL==fchmod(fd, mode))
0615         {
0616             goto err;
0617         }
0618         
0619         close(fd);
0620         
0621         if (EXSUCCEED!=qd_exhash_add((mqd_t) emqinfo))
0622         {
0623             NDRX_LOG(log_error, "Failed to add mqd_t to hash!");
0624             errno = ENOMEM;
0625         }
0626         NDRX_LOG(log_dump, "into: emq_open ret ok");
0627         return((mqd_t) emqinfo);
0628     }
0629 exists:
0630     
0631     /* open the file then memory map */
0632     if ( (fd = open(get_path(pathname, emq_x, sizeof(emq_x)), O_RDWR)) < 0)
0633     {
0634         if (errno == ENOENT && (oflag & O_CREAT))
0635         {
0636             goto again;
0637         }
0638         
0639         goto err;
0640     }
0641 
0642     /* make certain initialization is complete */
0643     for (i = 0; i < MAX_TRIES; i++)
0644     {
0645         if (EXFAIL == stat(get_path(pathname, emq_x, sizeof(emq_x)), &statbuff))
0646         {
0647             if (errno == ENOENT && (oflag & O_CREAT))
0648             {
0649                 close(fd);
0650                 goto again;
0651             }
0652             goto err;
0653         }
0654         
0655         if ((statbuff.st_mode & S_IXUSR) == 0)
0656         {
0657             break;
0658         }
0659         
0660         sleep(1);
0661     }
0662 
0663     if (i == MAX_TRIES)
0664     {
0665         errno = ETIMEDOUT;
0666         goto err;
0667     }
0668 
0669     filesize = statbuff.st_size;
0670     
0671     mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
0672     
0673     if (mptr == MAP_FAILED)
0674     {
0675         goto err;
0676     }
0677     
0678     close(fd);
0679 
0680     /* queue info block per process */
0681     if ( (emqinfo = NDRX_FPMALLOC(sizeof(struct emq_info), 0)) == NULL)
0682     {
0683         goto err;
0684     }
0685     emqinfo->emqi_hdr = (struct emq_hdr *) mptr;
0686     emqinfo->emqi_flags = nonblock;
0687     
0688     if (EXSUCCEED!=qd_exhash_add((mqd_t) emqinfo))
0689     {
0690         NDRX_LOG(log_error, "Failed to add mqd_t to hash!");
0691         errno = ENOMEM;
0692     }
0693     
0694     NDRX_LOG(log_dump, "into: emq_open ret ok");
0695     return((mqd_t) emqinfo);
0696 pthreaderr:
0697     errno = i;
0698 err:
0699 
0700     save_errno = errno;
0701 
0702     if (created)
0703     {
0704         unlink(get_path(pathname, emq_x, sizeof(emq_x)));
0705     }
0706 
0707     if (mptr != MAP_FAILED)
0708     {
0709         munmap(mptr, filesize);
0710     }
0711 
0712     if (emqinfo != NULL)
0713     {
0714         NDRX_FPFREE(emqinfo);
0715     }
0716 
0717     close(fd);
0718     
0719     NDRX_LOG(log_dump, "into: emq_open ret -1");
0720     
0721     errno = save_errno;
0722     return((mqd_t) -1);
0723 }
0724 
0725 /**
0726  * Send timed message.
0727  * Basically we operate on locked queue and unlock at the end (or error)
0728  * if not lock is received, direct error is returned.
0729  * @param emqd q descr
0730  * @param ptr msg ptr
0731  * @param maxlen max len
0732  * @param priop priority (not used)
0733  * @param __abs_timeout timeout
0734  * @return bytes received or error
0735  */
0736 expublic ssize_t emq_timedreceive(mqd_t emqd, char *ptr, size_t maxlen, unsigned int *priop,
0737         const struct timespec *__abs_timeout)
0738 {
0739     int             n;
0740     long            index;
0741     char           *mptr;
0742     ssize_t         len;
0743     struct emq_hdr  *emqhdr;
0744     struct mq_attr *attr;
0745     struct emq_msg_hdr *msghdr;
0746     struct emq_info *emqinfo;
0747 
0748     NDRX_LOG(log_dump, "into: emq_timedreceive");
0749     
0750     if (!qd_hash_chk((mqd_t) emqd))
0751     {
0752         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0753         errno = EBADF;
0754         return EXFAIL;
0755     }
0756 
0757     emqinfo = emqd;
0758 
0759     emqhdr = emqinfo->emqi_hdr;
0760     mptr = (char *) emqhdr;
0761     attr = &emqhdr->emqh_attr;
0762     
0763     LOCK_Q;
0764 
0765     if (maxlen < (size_t)attr->mq_msgsize)
0766     {
0767         errno = EMSGSIZE;
0768         goto err;
0769     }
0770     
0771     if (attr->mq_curmsgs == 0)
0772     {
0773         
0774         if (emqinfo->emqi_flags & O_NONBLOCK)
0775         {
0776             errno = EAGAIN;
0777             goto err;
0778         }
0779         
0780         /* Wait for messages */
0781         emqhdr->emqh_nwait++;
0782         while (attr->mq_curmsgs == 0)
0783         {
0784             NDRX_LOG(log_dump, "queue empty on %p", emqd);
0785             
0786             if (NULL==__abs_timeout)
0787             {
0788                 if (EXSUCCEED!=(n=ndrx_pthread_cond_wait(&emqhdr->emqh_wait, 
0789                         &emqhdr->emqh_lock)))
0790                 {
0791                     /* have lock as stated by pthread_cond_wait !*/
0792                     NDRX_LOG(log_error, "%s: pthread_cond_wait failed %d: %s", 
0793                         __func__, n, strerror(n));
0794                     userlog("%s: pthread_cond_wait failed %d: %s", 
0795                         __func__, n, strerror(n));
0796                     errno = n;
0797 
0798                     emqhdr->emqh_nwait--;
0799                     goto err;
0800                 }
0801             }
0802             else
0803             {
0804                 /* wait some time...  */
0805                 NDRX_LOG(log_dump, "timed wait...");
0806                 
0807                 /* On osx this can do early returns...
0808                  * Also... we might get time-outs due to broadcasts...
0809                  * then we need to reduce the waiting time...
0810                  */
0811                 if (EXSUCCEED!=(n=ndrx_pthread_cond_timedwait(&emqhdr->emqh_wait, 
0812                         &emqhdr->emqh_lock, __abs_timeout)))
0813                 { 
0814                     
0815                     if (n!=ETIMEDOUT)
0816                     {
0817                         /* have lock as per pthread_cond_timedwait spec */
0818                         NDRX_LOG(log_error, "%s: ndrx_pthread_cond_timedwait failed %d: %s", 
0819                             __func__, n, strerror(n));
0820                         userlog("%s: ndrx_pthread_cond_timedwait failed %d: %s", 
0821                             __func__, n, strerror(n));
0822                     }
0823                     else
0824                     {
0825                         NDRX_LOG(log_dump, "ETIMEDOUT: attr->mq_curmsgs = %ld", attr->mq_curmsgs);
0826                     }
0827 
0828                     errno = n;
0829                     emqhdr->emqh_nwait--;
0830                     goto err;
0831                 }
0832             }
0833         }
0834         emqhdr->emqh_nwait--;
0835     }
0836 
0837     if ( (index = emqhdr->emqh_head) == 0)
0838     {
0839         NDRX_LOG(log_error, "emq_timedreceive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
0840         abort();
0841     }
0842 
0843     msghdr = (struct emq_msg_hdr *) &mptr[index];
0844     emqhdr->emqh_head = msghdr->msg_next;
0845     len = msghdr->msg_len;
0846     memcpy(ptr, msghdr + 1, len);
0847     
0848     if (priop != NULL)
0849     {
0850         *priop = msghdr->msg_prio;
0851     }
0852 
0853     /* just-read message goes to front of free list */
0854     msghdr->msg_next = emqhdr->emqh_free;
0855     emqhdr->emqh_free = index;
0856 
0857     /* if configuration of queues are changed, then wake up any one who 
0858      * is waiting, if not none waiting - no problem
0859      */
0860     pthread_cond_signal(&emqhdr->emqh_wait);
0861     
0862     attr->mq_curmsgs--;
0863 
0864     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0865     
0866     NDRX_LOG(log_dump, "emq_timedreceive - got something len=%d stats: %ld wait: %ld",
0867             len, attr->mq_curmsgs, emqhdr->emqh_nwait);
0868     return(len);
0869 
0870 err:
0871     
0872     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0873     n = errno;
0874     NDRX_LOG(log_dump, "emq_timedreceive - failed: %s stats: %ld wait: %ld",
0875             strerror(errno), attr->mq_curmsgs, emqhdr->emqh_nwait);
0876     errno = n;
0877     
0878     return EXFAIL;
0879 }
0880 
0881 /**
0882  * Send message on queue.
0883  * @param emqd q descr
0884  * @param ptr msg ptr
0885  * @param len msg len
0886  * @param prio not used
0887  * @param __abs_timeout timeout
0888  * @return EXSUCCEED or EXFAIL + errno set.
0889  */
0890 expublic int emq_timedsend(mqd_t emqd, const char *ptr, size_t len, unsigned int prio, 
0891         const struct timespec *__abs_timeout)
0892 {
0893     int              n;
0894     long             index, freeindex;
0895     char            *mptr;
0896     struct sigevent *sigev;
0897     struct emq_hdr   *emqhdr;
0898     struct mq_attr  *attr;
0899     struct emq_msg_hdr  *msghdr, *nmsghdr, *pmsghdr;
0900     struct emq_info  *emqinfo;
0901 
0902     NDRX_LOG(log_dump, "into: emq_timedsend");
0903 
0904     if (!qd_hash_chk((mqd_t) emqd))
0905     {
0906         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0907         errno = EBADF;
0908         return EXFAIL;
0909     }
0910     
0911     emqinfo = emqd;
0912     emqhdr = emqinfo->emqi_hdr;
0913     mptr = (char *) emqhdr;
0914     attr = &emqhdr->emqh_attr;
0915     
0916     LOCK_Q;
0917 
0918     if (len > (size_t)attr->mq_msgsize)
0919     {
0920         errno = EMSGSIZE;
0921         goto err;
0922     }
0923 
0924     if (attr->mq_curmsgs >= attr->mq_maxmsg)
0925     {
0926         /* queue is full */
0927         if (emqinfo->emqi_flags & O_NONBLOCK)
0928         {
0929             errno = EAGAIN;
0930             goto err;
0931         }
0932         /* wait for room for one message on the queue */
0933         while (attr->mq_curmsgs >= attr->mq_maxmsg)
0934         {
0935             NDRX_LOG(log_dump, "waiting on q %p", emqd);
0936             
0937             if (NULL==__abs_timeout)
0938             {
0939                 if (EXSUCCEED!=(n=ndrx_pthread_cond_wait(&emqhdr->emqh_wait, 
0940                         &emqhdr->emqh_lock)))
0941                 {
0942                     NDRX_LOG(log_error, "%s: pthread_cond_wait failed %d: %s", 
0943                             __func__, n, strerror(n));
0944                     userlog("%s: pthread_cond_wait failed %d: %s", 
0945                             __func__, n, strerror(n));
0946                     
0947                     /* no lock */
0948                     errno = n;
0949                     return -1;
0950                 }
0951             }
0952             else
0953             {
0954                 /* wait some time...  */
0955                 NDRX_LOG(log_dump, "timed wait...");
0956                 /* On osx this can do early returns...*/
0957                 if (EXSUCCEED!=(n=ndrx_pthread_cond_timedwait(&emqhdr->emqh_wait, 
0958                         &emqhdr->emqh_lock, __abs_timeout)))
0959                 {
0960                     if (n!=ETIMEDOUT)
0961                     {
0962                         NDRX_LOG(log_error, "%s: ndrx_pthread_cond_timedwait failed %d: %s", 
0963                             __func__, n, strerror(n));
0964                         userlog("%s: ndrx_pthread_cond_timedwait failed %d: %s", 
0965                                 __func__, n, strerror(n));
0966                         errno = n;
0967                         return -1;
0968                     }
0969                     NDRX_LOG(log_error, "ETIMEDOUT: attr->mq_curmsgs = %ld", attr->mq_curmsgs);
0970                     
0971                     /* we have lock... */
0972                     errno = n;
0973                     goto err;
0974                 }
0975             }
0976             NDRX_LOG(log_info, "%p - accessed ok", emqd);
0977         }
0978     }
0979     /* nmsghdr will point to new message */
0980     if ( (freeindex = emqhdr->emqh_free) == 0)
0981     {
0982         userlog("emq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
0983     }
0984 
0985     nmsghdr = (struct emq_msg_hdr *) &mptr[freeindex];
0986     nmsghdr->msg_prio = prio;
0987     nmsghdr->msg_len = len;
0988     memcpy(nmsghdr + 1, ptr, len);
0989     emqhdr->emqh_free = nmsghdr->msg_next;
0990 
0991     /* Search the places for message */
0992     index = emqhdr->emqh_head;
0993     pmsghdr = (struct emq_msg_hdr *) &(emqhdr->emqh_head);
0994     
0995     while (index != 0)
0996     {
0997         msghdr = (struct emq_msg_hdr *) &mptr[index];
0998         
0999         if (prio > msghdr->msg_prio)
1000         {
1001             nmsghdr->msg_next = index;
1002             pmsghdr->msg_next = freeindex;
1003             break;
1004         }
1005         index = msghdr->msg_next;
1006         pmsghdr = msghdr;
1007     }
1008     
1009     if (index == 0)
1010     {
1011         pmsghdr->msg_next = freeindex;
1012         nmsghdr->msg_next = 0;
1013     }
1014     
1015     /* sent notification if we queue is empty */
1016     if (attr->mq_curmsgs == 0)
1017     {
1018         if (emqhdr->emqh_pid != 0 && emqhdr->emqh_nwait == 0)
1019         {
1020             sigev = &emqhdr->emqh_event;
1021             
1022             if (sigev->sigev_notify == SIGEV_SIGNAL)
1023             {
1024                 kill(emqhdr->emqh_pid, sigev->sigev_signo);
1025             }
1026             emqhdr->emqh_pid = 0;
1027         }
1028     }
1029 
1030     /* if configuration of queues are changed, then wake up any one who 
1031      * is waiting, if not none waiting - no problem
1032      */
1033     pthread_cond_signal(&emqhdr->emqh_wait);
1034     attr->mq_curmsgs++;
1035     
1036     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1037     NDRX_LOG(log_dump, "into: emq_timedsend - return 0 stats: %ld wait: %ld",
1038             attr->mq_curmsgs, emqhdr->emqh_nwait);
1039     return EXSUCCEED;
1040 
1041 err:
1042     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1043 
1044     n = errno;
1045     NDRX_LOG(log_dump, "into: emq_timedsend - return -1: %s stats: %ld wait: %ld",
1046             strerror(n), attr->mq_curmsgs, emqhdr->emqh_nwait);
1047     errno = n;
1048     return EXFAIL;
1049 }
1050 
1051 /**
1052  * Send Posix q msg, no tout
1053  * @param emqd
1054  * @param ptr
1055  * @param len
1056  * @param prio
1057  * @return 
1058  */
1059 expublic int emq_send(mqd_t emqd, const char *ptr, size_t len, unsigned int prio)
1060 {
1061     return emq_timedsend(emqd, ptr, len, prio, NULL);
1062 }
1063 
1064 /**
1065  * Receive Posix q msg, no tout
1066  * @param emqd
1067  * @param ptr
1068  * @param maxlen
1069  * @param priop
1070  * @return 
1071  */
1072 expublic ssize_t emq_receive(mqd_t emqd, char *ptr, size_t maxlen, unsigned int *priop)
1073 {
1074     return emq_timedreceive(emqd, ptr, maxlen, priop, NULL);
1075 }
1076 
1077 /**
1078  * Set queue attr
1079  * @param emqd
1080  * @param emqstat
1081  * @param oemqstat
1082  * @return 
1083  */
1084 expublic int emq_setattr(mqd_t emqd, const struct mq_attr *emqstat, struct mq_attr *oemqstat)
1085 {
1086     int             n;
1087     struct emq_hdr  *emqhdr;
1088     struct mq_attr *attr;
1089     struct emq_info *emqinfo;
1090 
1091     NDRX_LOG(log_dump, "into: emq_setattr");
1092 
1093     if (!qd_hash_chk((mqd_t) emqd))
1094     {
1095         NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
1096         errno = EBADF;
1097         return EXFAIL;
1098     }
1099     
1100     emqinfo = emqd;
1101     emqhdr = emqinfo->emqi_hdr;
1102     attr = &emqhdr->emqh_attr;
1103     
1104     LOCK_Q;
1105 
1106     if (oemqstat != NULL)
1107     {
1108         oemqstat->mq_flags = emqinfo->emqi_flags;
1109         oemqstat->mq_maxmsg = attr->mq_maxmsg;
1110         oemqstat->mq_msgsize = attr->mq_msgsize;
1111         oemqstat->mq_curmsgs = attr->mq_curmsgs;
1112     }
1113 
1114     if (emqstat->mq_flags & O_NONBLOCK)
1115     {
1116         emqinfo->emqi_flags |= O_NONBLOCK;
1117     }
1118     else
1119     {
1120         emqinfo->emqi_flags &= ~O_NONBLOCK;
1121     }
1122 
1123     MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1124     NDRX_LOG(log_dump, "into: emq_setattr - return 0");
1125     return EXSUCCEED;
1126 }
1127 
1128 /**
1129  * Delete Q
1130  * @param pathname
1131  * @return 
1132  */
1133 expublic int emq_unlink(const char *pathname)
1134 {
1135     char emq_x[PATH_MAX+1];
1136     NDRX_LOG(log_dump, "into: emq_unlink");
1137     
1138     if (unlink(get_path(pathname, emq_x, sizeof(emq_x))) == -1)
1139     {
1140         NDRX_LOG(log_dump, "into: emq_unlink ret -1");
1141         return EXFAIL;
1142     }
1143     NDRX_LOG(log_dump, "into: emq_unlink ret 0");
1144     return EXSUCCEED;
1145 }
1146 
1147 
1148 /* vim: set ts=4 sw=4 et smartindent: */