Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief System V Queue polling
0003  *   Basically we will operate with special shared memory registry
0004  *   where will be mapping between string name -> queue id
0005  *   and vice versa queue id -> string
0006  *   the registry is protected by read/write System V semaphore.
0007  *   The shared mem by it self is based on Posix shared mem.
0008  *   For System V, there will be new parameter used NDRX_QUEUES_MAX
0009  *   that will device the limits of the shared memory for queues.
0010  *
0011  * @file sys_svq.c
0012  */
0013 /* -----------------------------------------------------------------------------
0014  * Enduro/X Middleware Platform for Distributed Transaction Processing
0015  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0016  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0017  * This software is released under one of the following licenses:
0018  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0019  * See LICENSE file for full text.
0020  * -----------------------------------------------------------------------------
0021  * AGPL license:
0022  *
0023  * This program is free software; you can redistribute it and/or modify it under
0024  * the terms of the GNU Affero General Public License, version 3 as published
0025  * by the Free Software Foundation;
0026  *
0027  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0028  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0029  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0030  * for more details.
0031  *
0032  * You should have received a copy of the GNU Affero General Public License along 
0033  * with this program; if not, write to the Free Software Foundation, Inc.,
0034  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0035  *
0036  * -----------------------------------------------------------------------------
0037  * A commercial use license is available from Mavimax, Ltd
0038  * contact@mavimax.com
0039  * -----------------------------------------------------------------------------
0040  */
0041 
0042 /*---------------------------Includes-----------------------------------*/
0043 
0044 #include <ndrx_config.h>
0045 
0046 #ifdef EX_OS_AIX
0047 /* This is for aix to active extended poll */
0048 #define _MSGQSUPPORT 1
0049 #endif
0050 
0051 #include <stdlib.h>
0052 #include <stdio.h>
0053 #include <fcntl.h>           /* For O_* constants */
0054 #include <sys/ipc.h>
0055 #include <sys/msg.h>
0056 #include <sys/time.h>
0057 
0058 #include <ndrstandard.h>
0059 
0060 #include <nstopwatch.h>
0061 #include <nstd_tls.h>
0062 #include <exhash.h>
0063 #include <ndebug.h>
0064 #include <sys_svq.h>
0065 
0066 
0067 #include "sys_unix.h"
0068 
0069 /*---------------------------Externs------------------------------------*/
0070 /*---------------------------Macros-------------------------------------*/
0071 
0072 #define VALIDATE_MQD if ( NULL==mqd || (mqd_t)EXFAIL==mqd)\
0073     {\
0074         NDRX_LOG(log_error, "Invalid queue descriptor %p", mqd);\
0075         errno = EINVAL;\
0076         EXFAIL_OUT(ret);\
0077     }
0078 
0079 /**
0080  * Set timeout config common for send and receive
0081  */
0082 #define NDRX_SVAPOLL_TOUT_SET     gettimeofday (&timeval, NULL);\
0083     tout = ((__abs_timeout->tv_sec - timeval.tv_sec)*1000 +\
0084         (__abs_timeout->tv_nsec/1000 - timeval.tv_usec)/1000);\
0085     ndrx_stopwatch_timer_set(&w, (int)tout);\
0086     wait_left = ndrx_stopwatch_get_delta(&w) * -1;\
0087 \
0088     if (wait_left<=0)\
0089     {\
0090         NDRX_LOG(log_error, "expired call: wait_left: %d tout: %ld qid: %d", wait_left, tout, mqd->qid);\
0091     }\
0092 
0093 /*---------------------------Enums--------------------------------------*/
0094 /*---------------------------Typedefs-----------------------------------*/
0095 /*---------------------------Globals------------------------------------*/
0096 /*---------------------------Statics------------------------------------*/
0097 /*---------------------------Prototypes---------------------------------*/
0098 
0099 /**
0100  * For SystemV:
0101  * Close queue. Basically we remove the dynamic data associated with queue
0102  * the queue id by it self continues to live on... until it is unlinked
0103  * 
0104  * For svapoll: just remove the allocated block.
0105  * @param mqd queue descriptor
0106  * @return EXSUCCEED/EXFAIL
0107  */
0108 expublic int ndrx_svq_close(mqd_t mqd)
0109 {
0110     NDRX_LOG(log_debug, "close %p mqd", mqd);
0111     
0112     if (NULL!=mqd && (mqd_t)EXFAIL!=mqd)
0113     {   
0114 #ifdef EX_USE_SYSVQ
0115         
0116 #if 0
0117         /* close the queue 
0118          * we will put the Q in hash locally so while it is in pipe
0119          * (in kernel space), the address sanitizer might see it as leaked
0120          * ptr. Thus hash will keep the local pointer.
0121          * This is really needed for sanitizer only...
0122          */
0123         mqd->self = (char *)mqd;
0124         ndrx_svq_delref_add(mqd);
0125         
0126         if (EXSUCCEED!=ndrx_svq_moncmd_close(mqd))
0127         {
0128             NDRX_LOG(log_error, "Failed to close queue %p", mqd);
0129             userlog("Failed to close queue %p", mqd);
0130         }
0131         
0132         /*
0133          * free will be done by backend thread..
0134         NDRX_FREE(mqd);
0135          */
0136 #endif
0137         /* remove from hashes... */
0138         ndrx_svq_mqd_close(mqd);
0139         NDRX_FPFREE(mqd);
0140 #endif
0141 
0142 #ifdef EX_USE_SVAPOLL
0143         NDRX_FPFREE(mqd);
0144 #endif
0145         
0146         return EXSUCCEED;
0147     }
0148     else
0149     {
0150         NDRX_LOG(log_error, "Invalid mqd %p!", mqd);
0151         errno = EBADF;
0152         return EXFAIL;
0153     }
0154 }
0155 
0156 /**
0157  * Get attributes of the queue
0158  * @param mqd queue descriptor / ptr to descr block
0159  * @param attr queue stats
0160  * @return EXSUCCEED/EXFAIL
0161  */
0162 expublic int ndrx_svq_getattr(mqd_t mqd, struct mq_attr *attr)
0163 {
0164     int ret = EXSUCCEED;
0165     int err = 0;
0166     struct msqid_ds buf;
0167     
0168     VALIDATE_MQD;
0169     
0170     if (NULL==attr)
0171     {
0172         NDRX_LOG(log_error, "Invalid attr is null", mqd);
0173         errno = EINVAL;
0174         EXFAIL_OUT(ret);
0175     }
0176     
0177     memcpy(attr, &(mqd->attr), sizeof(*attr));
0178     
0179     /* read the queue stats */
0180     if (EXSUCCEED!=msgctl(mqd->qid, IPC_STAT, &buf))
0181     {
0182         err = errno;
0183         NDRX_LOG(log_debug, "Failed to get queue qid %d stats: %s",
0184                 mqd->qid, strerror(err));
0185         userlog("Failed to get queue qid %d stats: %s",
0186                 mqd->qid, strerror(err));
0187     }
0188     
0189     attr->mq_curmsgs = (long)buf.msg_qnum;
0190 
0191 out:
0192     errno = err;
0193     return ret;
0194 }
0195 
0196 /**
0197  * Perform notification - not supported
0198  * @param mqd queue descriptor 
0199  * @param notification signal event descr
0200  * @return EXFAIL
0201  */
0202 expublic int ndrx_svq_notify(mqd_t mqd, const struct sigevent *notification)
0203 {
0204     NDRX_LOG(log_error, "mq_notify() not supported by System V queue emulation!");
0205     userlog("mq_notify() not supported by System V queue emulation!");
0206     return EXFAIL;
0207 }
0208 
0209 /**
0210  * Open queue this, will include following steps:
0211  * - build queue object
0212  * - register into hash list
0213  * - Generate next queue id, according shm
0214  * - if fails, something then queue must be removed from shm
0215  * @param pathname queue path
0216  * @param oflag queue flags
0217  * @param mode unix permissions mode
0218  * @param mq_attr attributes
0219  * @return queue descriptor or EXFAIL
0220  */
0221 expublic mqd_t ndrx_svq_open(const char *pathname, int oflag, mode_t mode, 
0222         struct mq_attr *attr)
0223 {
0224     /* Allocate queue object */
0225     mqd_t mq = (mqd_t)EXFAIL;
0226     int ret = EXSUCCEED;
0227     int errno_save;
0228     
0229     NDRX_LOG(log_debug, "enter");
0230     mq = NDRX_FPMALLOC(sizeof(struct ndrx_svq_info), 0);
0231     memset(mq, 0, sizeof(struct ndrx_svq_info));
0232     
0233     if (NULL==mq)
0234     {
0235         errno_save=errno;
0236         NDRX_LOG(log_error, "Failed to malloc %d bytes queue descriptor", 
0237                 (int)sizeof(struct ndrx_svq_info));
0238         userlog("%s: Failed to malloc %d bytes queue descriptor", 
0239                 __func__, (int)sizeof(struct ndrx_svq_info));
0240         mq = (mqd_t)EXFAIL;
0241         EXFAIL_OUT(ret);
0242     }
0243     
0244     /* so we have an options:
0245      * - if we create a Q, then alloc new ID
0246      * - if queue already exists SHM, then we can use that ID directly
0247      */
0248     if (EXFAIL==(mq->qid = ndrx_svqshm_get((char *)pathname, mode, oflag)))
0249     {
0250         errno_save=errno;
0251         EXFAIL_OUT(ret);
0252     }
0253     
0254     /* mq->thread = pthread_self(); - set only in timed functions */
0255     NDRX_STRCPY_SAFE(mq->qstr, pathname);
0256     mq->mode = mode;
0257     if (NULL!=attr)
0258     {
0259         memcpy(&(mq->attr), attr, sizeof (*attr));
0260         
0261         /* initial attribs does not include flags */
0262         if (oflag & O_NONBLOCK)
0263         {
0264             mq->attr.mq_flags|=O_NONBLOCK;
0265             NDRX_LOG(log_debug, "Opening in non blocked mode");
0266         }
0267     }
0268 #ifdef EX_USE_SYSVQ
0269     /* Init mutexes... */
0270     NDRX_SPIN_INIT_V(mq->rcvlock);
0271     NDRX_SPIN_INIT_V(mq->rcvlockb4);
0272     NDRX_SPIN_INIT_V(mq->stamplock);
0273     MUTEX_VAR_INIT(mq->barrier);
0274     MUTEX_VAR_INIT(mq->qlock);
0275 #endif
0276     
0277 out:
0278     
0279     if (EXSUCCEED!=ret)
0280     {
0281         if (NULL!=(mqd_t)EXFAIL)
0282         {
0283             NDRX_FPFREE((char *)mq);
0284             mq = (mqd_t)EXFAIL;
0285         }
0286     }
0287 
0288     NDRX_LOG(log_debug, "return %p/%ld", mq, (long)mq);
0289     
0290     errno=errno_save;
0291     return mq;
0292 }
0293 
0294 /**
0295  * Timed queue receive operation
0296  * @param mqd queue descriptor
0297  * @param ptr data ptr
0298  * @param maxlen buffer size
0299  * @param priop not used
0300  * @param __abs_timeout absolute time out according to mq_timedreceive(3)
0301  * @return data len received
0302  */
0303 
0304 expublic ssize_t ndrx_svq_timedreceive(mqd_t mqd, char *ptr, size_t maxlen, 
0305         unsigned int *priop, const struct timespec *__abs_timeout)
0306 {
0307     
0308 #ifdef EX_USE_SYSVQ
0309     ssize_t ret = maxlen;
0310     ndrx_svq_ev_t *ev = NULL;
0311     int err = 0;
0312     
0313     VALIDATE_MQD;
0314     
0315     /* set thread handler - for interrupts */
0316     mqd->thread = pthread_self();
0317     
0318     if (EXSUCCEED!=ndrx_svq_event_sndrcv( mqd, ptr, &ret, 
0319             (struct timespec *)__abs_timeout, &ev, EXFALSE, EXFALSE))
0320     {
0321         err = errno;
0322         if (NULL!=ev)
0323         {
0324             if (NDRX_SVQ_EV_TOUT==ev->ev)
0325             {
0326                 NDRX_LOG(log_info, "Timed out");
0327                 errno = ETIMEDOUT;
0328             }
0329             else
0330             {
0331                 NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0332                 errno = EBADF;
0333             }
0334         }
0335         else
0336         {
0337             /* translate the error codes */
0338             if (ENOMSG==err)
0339             {
0340                 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0341                     strerror(err));
0342                 errno = EAGAIN;
0343             }
0344             else
0345             {
0346                 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0347                     strerror(err));
0348             }
0349         }
0350         
0351         EXFAIL_OUT(ret);
0352     }
0353     
0354 out:
0355     
0356     if (NULL!=ev)
0357     {
0358         /* save the error, if any.. */
0359         err=errno;
0360         NDRX_FPFREE(ev);
0361         errno=err;
0362     }
0363     
0364     /* errno = err; errno is loaded */
0365     return ret;
0366 #endif
0367     
0368 #ifdef EX_USE_SVAPOLL
0369     /* in case of SVAPOLL
0370      * in loop while time left wait for even in poll
0371      * once we get something we try to receive,
0372      * if noting to receive, go to sleep (if time left)
0373      * if no time left, just return TPETIME.
0374      * In the same way if poll gives timeout, just return TPETIME.
0375      */
0376     
0377     int wait_left;
0378     long tout;
0379     ndrx_stopwatch_t w;
0380     int ret;
0381     int err;
0382     long *l;
0383     struct timeval timeval;
0384     
0385     VALIDATE_MQD;
0386     
0387     NDRX_LOG(log_debug, "receiving msg mqd=%p, ptr=%p, maxlen=%d flags: %ld qid: %d",
0388                 mqd, ptr, (int)maxlen, mqd->attr.mq_flags, mqd->qid);
0389     
0390     if (maxlen<sizeof(long))
0391     {
0392         NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d", 
0393                 (int)sizeof(long), (int)maxlen);
0394         errno = EINVAL;
0395         EXFAIL_OUT(ret);
0396     }
0397     
0398     l = (long *)ptr;    
0399     *l = 1;
0400     
0401     ret=msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, IPC_NOWAIT);
0402     
0403     /* if blocked mode is requested... */
0404     if (EXFAIL==ret)
0405     {
0406         if (ENOMSG!=errno || mqd->attr.mq_flags & O_NONBLOCK)
0407         {
0408             /* if no msg, then continue with bellow */
0409             err = errno;
0410             NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0411                         strerror(err));
0412         
0413             /* translate to posix */
0414             if (ENOMSG==err)
0415             {
0416                 err = EAGAIN;
0417             }
0418         
0419             errno = err;
0420             goto out;
0421         }
0422         /* if got ENOMSG... so wait */
0423     }
0424     else 
0425     {
0426         /* got result */
0427         goto out;
0428     }
0429     
0430     NDRX_SVAPOLL_TOUT_SET;
0431 
0432     /* prepare for timed out */ 
0433     errno=ETIMEDOUT;
0434     ret=EXFAIL;
0435 
0436     /* wait for message...  */
0437     while (wait_left>0)
0438     {
0439         /* do poll on queue.. */
0440         struct ndrx_pollmsg msgs;
0441         unsigned long nfd = 1 << 16;
0442         
0443         msgs.msgid=mqd->qid;
0444         msgs.reqevents = POLLIN;
0445         msgs.rtnevents = 0;
0446         
0447         NDRX_LOG(log_debug, "wait_left: %d qid: %d", wait_left, mqd->qid);
0448         ret = poll((void *)&msgs, nfd, wait_left);
0449         err=errno;
0450         NDRX_LOG(log_debug, "poll ret: %d qid: %d wait_left: %d", ret, mqd->qid, wait_left);
0451         if (ret>0)
0452         {
0453             /* OK, can try to receive something */
0454             if (EXFAIL==(ret = msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, IPC_NOWAIT)))
0455             {
0456                 err = errno;
0457                 /* translate the error codes */
0458                 if (ENOMSG==err)
0459                 {
0460                     NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0461                         strerror(err));
0462                     /* OK try again, some else downloaded msg.. */
0463                 }
0464                 else
0465                 {
0466                     NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0467                         strerror(err));
0468                     errno = err;
0469                     /* terminate the process.. */
0470                     break;
0471                 }
0472             }
0473             else
0474             {
0475                 /* message is received -> terminate... */
0476                 break;
0477             }
0478         }
0479         else if (0==ret)
0480         {
0481             errno = ETIMEDOUT;
0482             ret=EXFAIL;
0483             break;
0484         }
0485         else
0486         {
0487             /* this is poll error */
0488             NDRX_LOG(log_error, "poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0489                 wait_left, strerror(err));
0490             
0491             userlog("poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0492                 wait_left, strerror(err));
0493             errno = err;
0494 
0495             /* terminate the receive */
0496             break;
0497         }
0498 
0499         wait_left = ndrx_stopwatch_get_delta(&w) * -1;
0500         /* prepare for timeout if we do not go second loop */
0501         errno=ETIMEDOUT;
0502         ret=EXFAIL;
0503     }
0504 
0505 out:    
0506     if (ret>=0)
0507     {
0508         ret=NDRX_SVQ_OUTLEN(ret);
0509     }
0510     
0511     return ret;
0512     
0513 #endif
0514     
0515 }
0516 
0517 
0518 /**
0519  * Sned message with timeout option
0520  * @param mqd message queue descriptor
0521  * @param ptr data ptr including mtype - long
0522  * @param len data len including mtype - long
0523  * @param prio message priority, not used
0524  * @param __abs_timeout time out..., from this we calculate the time diff
0525  *  for setting the delta time we shall spend in Q
0526  * @return EXSUCCEED/EXFAIL
0527  */
0528 expublic int ndrx_svq_timedsend(mqd_t mqd, const char *ptr, size_t len, 
0529         unsigned int prio, const struct timespec *__abs_timeout)
0530 {
0531     
0532 #ifdef EX_USE_SYSVQ
0533     ssize_t ret = len;
0534     ndrx_svq_ev_t *ev = NULL;
0535     int err = 0;
0536     long *l = (long *)ptr;
0537     
0538     *l = 1; /* set default mtype.. */
0539     /* set thread handler - for interrupts */
0540     
0541     /* Check for invalid descriptor... */
0542     VALIDATE_MQD;
0543     
0544     mqd->thread = pthread_self();
0545     
0546     if (EXSUCCEED!=ndrx_svq_event_sndrcv( mqd, (char *)ptr, &ret, 
0547             (struct timespec *)__abs_timeout, &ev, EXTRUE, EXFALSE))
0548     {
0549         err = errno;
0550         if (NULL!=ev)
0551         {
0552             if (NDRX_SVQ_EV_TOUT==ev->ev)
0553             {
0554                 NDRX_LOG(log_warn, "Timed out");
0555                 errno = ETIMEDOUT;
0556             }
0557             else
0558             {
0559                 NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0560                 errno = EBADF;
0561             }
0562         }
0563         else
0564         {
0565             /* translate the error codes */
0566             if (ENOMSG==err)
0567             {
0568                 NDRX_LOG(log_debug, "msgsnd(qid=%d) failed: %s", mqd->qid, 
0569                     strerror(err));
0570                 errno = EAGAIN;
0571             }
0572             else
0573             {
0574                 NDRX_LOG(log_error, "msgsnd(qid=%d) failed: %s", mqd->qid, 
0575                     strerror(err));
0576             }
0577         }
0578         
0579         EXFAIL_OUT(ret);
0580     }
0581     
0582     ret = EXSUCCEED;
0583 out:
0584     
0585     if (NULL!=ev)
0586     {
0587         /* save the error if any */
0588         err=errno;
0589         NDRX_FPFREE(ev);
0590         errno=err;
0591     }
0592     
0593     /* errno is loaded */
0594     return ret;
0595     
0596 #endif
0597     
0598 #ifdef EX_USE_SVAPOLL
0599     
0600     /* Try to send, if queue full, wait on poll
0601      * if poll says ok, try to send,.. again if full, wait on poll
0602      * until is sent or process times out..
0603      */
0604     
0605     ndrx_stopwatch_t w;
0606     int wait_left;
0607     long tout;
0608     int ret;
0609     int err;
0610     long *l;
0611     struct timeval timeval;
0612     
0613     VALIDATE_MQD;
0614     
0615     if (len<sizeof(long))
0616     {
0617         NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d", 
0618                 (int)sizeof(long), (int)len);
0619         errno = EINVAL;
0620         EXFAIL_OUT(ret);
0621     }
0622     
0623     l = (long *)ptr;    
0624     *l = 1;
0625     
0626     ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), IPC_NOWAIT);
0627     
0628     /* so if other error, or we get blocking condition when not requested */
0629     if (EXFAIL == ret)
0630     {
0631         if (EAGAIN!=errno || mqd->attr.mq_flags & O_NONBLOCK)
0632         {
0633             /* if no msg, then continue with bellow */
0634             err = errno;
0635             NDRX_LOG(log_error, "msgsnd(qid=%d) failed: %s", mqd->qid, 
0636                         strerror(err));
0637             errno = err;
0638             goto out;
0639         }
0640     }
0641     else 
0642     {
0643         /* got result */
0644         goto out;
0645     }
0646     
0647     NDRX_SVAPOLL_TOUT_SET;
0648 
0649     /* prepare for timeout ... */
0650     errno=ETIMEDOUT;
0651     ret=EXFAIL;
0652 
0653     /* wait for message...
0654      * firstly attempt to send, if NO MSG, then wait on POLL
0655      */
0656     while (wait_left>0)
0657     {
0658         /* do poll on queue.. */
0659         struct ndrx_pollmsg msgs;
0660         unsigned long nfd = 1 << 16;
0661         
0662         msgs.msgid=mqd->qid;
0663         msgs.reqevents = POLLOUT;
0664         msgs.rtnevents = 0;
0665         
0666         NDRX_LOG(log_debug, "wait_left: %d qid: %d", wait_left, mqd->qid);
0667         ret = poll((void *)&msgs, nfd, wait_left);
0668         NDRX_LOG(log_debug, "poll ret=%d", ret);
0669         
0670         if (ret>0)
0671         {
0672             /* OK, can try to receive something */
0673             if (EXFAIL==(ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), IPC_NOWAIT)))
0674             {
0675                 err=errno;
0676                 
0677                 /* translate the error codes */
0678                 if (EAGAIN==err)
0679                 {
0680                     /*
0681                     NDRX_LOG(log_debug, "msgsnd(qid=%d) failed: %s", mqd->qid, 
0682                         strerror(err));
0683                      */
0684                     
0685                     /* wait 1 ms, this could be the case that message is too big, but there is small space
0686                      * in queue. thus it poll gives OK, but we still cannot send
0687                      */
0688                     usleep(1000);
0689                     
0690                     /* OK try again, some else downloaded msg.. */
0691                 }
0692                 else
0693                 {
0694                     NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0695                         strerror(err));
0696                     errno = err;
0697                     
0698                     /* termiante the process.. */
0699                     break;
0700                 }
0701             }
0702             else
0703             {
0704                 /* sent OK, terminate */
0705                 break;
0706             }
0707         }
0708         else if (0==ret)
0709         {
0710             errno = ETIMEDOUT;
0711             ret=EXFAIL;
0712             break;
0713         }
0714         else
0715         {
0716             err = errno;
0717 
0718             /* this is poll error */
0719             NDRX_LOG(log_error, "poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0720                 wait_left, strerror(err));
0721 
0722             userlog("poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0723                 wait_left, strerror(err));
0724             errno = err;
0725 
0726             /* terminate the receive */
0727             break;
0728         }
0729 
0730         wait_left = ndrx_stopwatch_get_delta(&w) * -1;
0731         /* prepare for timeout ... */
0732         errno=ETIMEDOUT;
0733         ret=EXFAIL;
0734     }
0735 
0736 out:    
0737     /* in case of error, errno is loaded */
0738     
0739     return ret;
0740     
0741 #endif
0742 }
0743 
0744 /**
0745  * Just send msg, no timeout control.
0746  * This are also not used by polling interfaces...
0747  * @param mqd message queue descriptor
0748  * @param ptr data ptr to send. Note that structures include first field as
0749  *  long, due to System V IPC requirements!
0750  * @param len the full message size includes first long field. The
0751  *  function will do the internal wrappings by it self
0752  * @param prio priority, not used.
0753  * @return EXSUCCEED/EXFAIL
0754  */
0755 expublic int ndrx_svq_send(mqd_t mqd, const char *ptr, size_t len, 
0756         unsigned int prio)
0757 {
0758     int ret = EXSUCCEED;
0759     long *l;
0760     int msgflg;
0761     
0762     NDRX_LOG(log_debug, "sending msg mqd=%p, qid=%d, ptr=%p, len=%d",
0763                 mqd, mqd->qid, ptr, (int)len);
0764     
0765     VALIDATE_MQD;
0766     
0767     if (len<sizeof(long))
0768     {
0769         NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d", 
0770                 (int)sizeof(long), (int)len);
0771         errno = EINVAL;
0772         EXFAIL_OUT(ret);
0773     }
0774     
0775     l = (long *)ptr;    
0776     *l = 1;
0777     
0778     if (mqd->attr.mq_flags & O_NONBLOCK)
0779     {
0780         msgflg = IPC_NOWAIT;
0781     }
0782     else
0783     {
0784         msgflg = 0;
0785     }
0786     
0787     ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), msgflg);
0788     
0789     /* no logging here, as we need to keep errno */
0790     
0791 out:
0792     return ret;
0793 }
0794 
0795 /**
0796  * Receive message from queue, no timeout.
0797  * This are also not used by polling interfaces...
0798  * @param mqd message queue descriptor
0799  * @param ptr data ptr (this will include initial long field - message type)
0800  * @param maxlen max buffer size with out initial long
0801  * @param priop not used
0802  * @return EXSUCCEED/EXFAIL
0803  */
0804 expublic ssize_t ndrx_svq_receive(mqd_t mqd, char *ptr, size_t maxlen, 
0805         unsigned int *priop)
0806 {
0807     int ret = EXSUCCEED;
0808     long *l;
0809     int msgflg;
0810     
0811     VALIDATE_MQD;
0812     
0813     NDRX_LOG(log_debug, "receiving msg mqd=%p, ptr=%p, maxlen=%d flags: %ld qid: %d",
0814                 mqd, ptr, (int)maxlen, mqd->attr.mq_flags, mqd->qid);
0815     
0816     if (maxlen<sizeof(long))
0817     {
0818         NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d", 
0819                 (int)sizeof(long), (int)maxlen);
0820         errno = EINVAL;
0821         EXFAIL_OUT(ret);
0822     }
0823     
0824     l = (long *)ptr;    
0825     *l = 1;
0826     
0827     if (mqd->attr.mq_flags & O_NONBLOCK)
0828     {
0829         msgflg = IPC_NOWAIT;
0830     }
0831     else
0832     {
0833         msgflg = 0;
0834     }
0835     
0836     if (EXFAIL==(ret = msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, msgflg)))
0837     {
0838         int err = errno;
0839         
0840         /* translate the error codes */
0841         if (ENOMSG==err)
0842         {
0843             NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0844                 strerror(err));
0845             err = EAGAIN;
0846         }
0847         else
0848         {
0849             NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid, 
0850                 strerror(err));
0851         }
0852         
0853         errno = err;
0854     }
0855     
0856     /* no logging here, as we need to keep errno */
0857     if (ret>=0)
0858     {
0859         ret=NDRX_SVQ_OUTLEN(ret);
0860     }
0861     
0862 out:
0863     return ret;
0864 }
0865 
0866 /**
0867  * set queue attributes
0868  * @param mqd queue descriptor
0869  * @param attr new queue stat
0870  * @param oattr old queue stat (returned if not NULL)
0871  * @return EXSUCCEED/EXFAIL
0872  */
0873 expublic int ndrx_svq_setattr(mqd_t mqd, const struct mq_attr *attr, 
0874         struct mq_attr *oattr)
0875 {
0876     int ret = EXSUCCEED;
0877     
0878     VALIDATE_MQD;
0879     
0880     if (NULL==attr)
0881     {
0882         NDRX_LOG(log_error, "Invalid attr is null", mqd);
0883         errno = EINVAL;
0884         EXFAIL_OUT(ret);
0885     }
0886     
0887     /* old ret old attribs */
0888     if (NULL!=oattr)
0889     {
0890         memcpy(oattr, &(mqd->attr), sizeof(*oattr));
0891     }
0892     
0893     memcpy(&(mqd->attr), attr, sizeof(*attr));
0894     
0895 out:
0896     return ret;
0897 }
0898 
0899 /**
0900  * Unlink queue should simply lookup the tables and remove the id
0901  * @param pathname queue string
0902  * @return EXSUCCEED/EXFAIL
0903  */
0904 expublic int ndrx_svq_unlink(const char *pathname)
0905 {
0906     /* for ndrxd service queue unlinks we shall use ndrx_svqshm_ctl directly */
0907     
0908     /* TODO: we might need a condition variable to be sent in command
0909      * so that back thread can update it once delete is fine...! */
0910     
0911     return ndrx_svqshm_ctl((char *)pathname, EXFAIL, IPC_RMID, EXFAIL, NULL);
0912     
0913 }
0914 
0915 /* vim: set ts=4 sw=4 et smartindent: */