Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief System V Queue polling
0003  *   here we will work with assumption that there is only one poller sub-system
0004  *   per process.
0005  *
0006  * @file sys_svqpoll.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 
0037 /*---------------------------Includes-----------------------------------*/
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <time.h>
0041 
0042 #include <unistd.h>
0043 #include <stdarg.h>
0044 #include <ctype.h>
0045 #include <memory.h>
0046 #include <errno.h>
0047 #include <signal.h>
0048 #include <limits.h>
0049 #include <pthread.h>
0050 #include <string.h>
0051 
0052 #include <ndrstandard.h>
0053 #include <ndebug.h>
0054 #include <nstdutil.h>
0055 #include <limits.h>
0056 
0057 #include <sys_unix.h>
0058 #include <sys_mqueue.h>
0059 #include <sys_svq.h>
0060 #include <fcntl.h>
0061 
0062 #include <atmi.h>
0063 #include <atmi_int.h>
0064 
0065 /*---------------------------Externs------------------------------------*/
0066 /*---------------------------Macros-------------------------------------*/
0067 /*---------------------------Enums--------------------------------------*/
0068 /*---------------------------Typedefs-----------------------------------*/
0069 
0070 /**
0071  * Hash of queues and their timeouts assigned and stamps...
0072  */
0073 typedef struct
0074 {
0075     char svcnm[MAXTIDENT+1];     /**< Service name                       */
0076     mqd_t mqd;                  /**< Queue handler hashed               */
0077     int idx;                    /**< either fake service or admin Q     */
0078     EX_hash_handle hh;          /**< make hashable                      */
0079 } ndrx_svq_pollsvc_t;
0080 
0081 /*---------------------------Globals------------------------------------*/
0082 
0083 /**
0084  * main polling queue. All the service requests are made to this queue.
0085  */
0086 exprivate char M_mainqstr[NDRX_MAX_Q_SIZE+1] = "";
0087 
0088 /**
0089  * Main queue object used for polling
0090  */
0091 exprivate mqd_t M_mainq = NULL;
0092 
0093 /**
0094  * Service mapping against the fake queue descriptors
0095  */
0096 exprivate ndrx_svq_pollsvc_t * M_svcmap = NULL;
0097 
0098 /**
0099  * This is used for special cases such as bridges
0100  */
0101 exprivate int M_accept_any = EXFALSE;
0102 
0103 /**
0104  * Number of file descriptors monitored.
0105  */
0106 exprivate int M_nrfds = 0;
0107 
0108 /**
0109  * Protect M_nrfds
0110  */
0111 NDRX_SPIN_LOCKDECL(M_nrfds_lock);
0112 
0113 /*---------------------------Statics------------------------------------*/
0114 /*---------------------------Prototypes---------------------------------*/
0115 
0116 /**
0117  * Remove all resources used by polling sub-system or queuing
0118  * @return EXSUCCEED/EXFAIL
0119  */
0120 expublic int ndrx_epoll_down(int force)
0121 {
0122     int ret = EXSUCCEED;
0123     
0124     ret=ndrx_svqshm_down(force);
0125     
0126 out:
0127     return ret;
0128 }
0129 
0130 /**
0131  * Detach from shared memory resources (used by both epoll & simple queues)
0132  * @return EXUSCCEED
0133  */
0134 expublic int ndrx_epoll_shmdetach(void)
0135 {
0136     /* terminate aux thread */
0137     ndrx_svq_event_exit(EXTRUE);
0138     
0139     return EXSUCCEED;
0140 }
0141 
0142 /**
0143  * Get Main queue / basically RQADDR!
0144  * @return main queue of current service poller
0145  */
0146 expublic mqd_t ndrx_svq_mainq_get(void)
0147 {
0148     return M_mainq;
0149 }
0150 
0151 /**
0152  * Return main poller resource id, installed into SHM
0153  * @return 
0154  */
0155 expublic int ndrx_epoll_resid_get(void)
0156 {
0157     return M_mainq->qid;
0158 }
0159 
0160 /**
0161  * Translate the service name to queue
0162  * @param[out] send_q output service queue
0163  * @param[in] q_prefix queue prefix
0164  * @param[in] svc service name
0165  * @param[in] resid resource id (in this case it is 
0166  * @return EXSUCCEED/EXFAIL
0167  */
0168 expublic int ndrx_epoll_service_translate(char *send_q, char *q_prefix, 
0169         char *svc, int resid)
0170 {
0171     return ndrx_svqshm_get_qid(resid, send_q, NDRX_MAX_Q_SIZE+1);
0172 }
0173 
0174 /**
0175  * Set main polling queue. The queue is open once the
0176  * @param qstr full queue string for the main polling interface
0177  */
0178 expublic void ndrx_epoll_mainq_set(char *qstr)
0179 {
0180     NDRX_STRCPY_SAFE(M_mainqstr, qstr);
0181     NDRX_LOG(log_debug, "Setting System V main dispatching queue: [%s]", qstr);
0182 }
0183 
0184 /**
0185  * This test whether we need to open service queue 
0186  * @param idx advertise service index
0187  * @return EXTRUE - open q, EXFALSE - do not open Q
0188  */
0189 expublic int ndrx_epoll_shallopenq(int idx)
0190 {
0191     if (ATMI_SRV_ADMIN_Q==idx || ATMI_SRV_REPLY_Q==idx)
0192     {
0193         return EXTRUE;
0194     }
0195         
0196     return EXFALSE;
0197 }
0198 
0199 /**
0200  * Get service definition from service name
0201  * @param svcnm service name to lookup
0202  * @return NULL - not found or ptr to service definition
0203  */
0204 exprivate ndrx_svq_pollsvc_t * ndrx_epoll_getsvc(char *svcnm)
0205 {
0206     ndrx_svq_pollsvc_t *ret = NULL;
0207     ndrx_svq_pollsvc_t *elt = NULL;
0208     
0209     EXHASH_FIND_STR(M_svcmap, svcnm, ret);
0210     
0211     if (NULL==ret && M_accept_any)
0212     {
0213         EXHASH_ITER(hh, M_svcmap, ret, elt)
0214         {
0215             if (ret->idx > ATMI_SRV_REPLY_Q)
0216             {
0217                 NDRX_LOG(log_debug, "Accepting any msg svcnm [%s] mapped to [%s]",
0218                         svcnm, ret->svcnm);
0219                 goto out;
0220             }
0221         }
0222         ret = NULL;
0223     }
0224     
0225     if (NULL==ret)
0226     {
0227         NDRX_LOG(log_error, "Failed to find queue definition for [%s] service",
0228                 svcnm);
0229     }
0230     
0231 out:
0232     return ret;
0233 }
0234 
0235 /**
0236  * Get first non admin queue
0237  * @return NULL - not found, or queue found
0238  */
0239 exprivate ndrx_svq_pollsvc_t * ndrx_epoll_getfirstna(void)
0240 {
0241     ndrx_svq_pollsvc_t *el = NULL, *elt;
0242     
0243     EXHASH_ITER(hh, M_svcmap, el, elt)
0244     {
0245         if (ATMI_SRV_ADMIN_Q!=el->idx)
0246         {
0247             break;
0248         }
0249     }
0250     
0251 out:
0252     return el;
0253 }
0254 
0255 /**
0256  * Register service queue with poller interface.
0257  * add logical names for admin and reply services
0258  * returns the used queue id back.
0259  * @param svcnm service name
0260  * @param idx queue index (admin 0 /reply 1/service > 1)
0261  * @param mq_exists Existing queue defintion (for admin queues)
0262  * @return logical/real queue descriptor or NULL in case of error
0263  */
0264 expublic mqd_t ndrx_epoll_service_add(char *svcnm, int idx, mqd_t mq_exits)
0265 {
0266     int ret = EXSUCCEED;
0267     mqd_t mq = NULL;
0268     ndrx_svq_pollsvc_t *el = NULL;
0269     int err;
0270     char *adminsvc = NDRX_SVC_ADMIN;
0271     char *replysvc = NDRX_SVC_REPLY;
0272     
0273     if (NULL==(el=NDRX_MALLOC(sizeof(*el))))
0274     {
0275         err = errno;
0276         NDRX_LOG(log_error, "Failed to malloc %d bytes: %s", 
0277                 sizeof(*el), strerror(err));
0278         userlog("Failed to malloc %d bytes: %s", sizeof(*el),
0279                 strerror(err));
0280         EXFAIL_OUT(ret);
0281     }
0282     
0283     /* allocate pointer of 1 byte */
0284     /* register the queue in the hash */
0285     if (ATMI_SRV_ADMIN_Q==idx)
0286     {
0287         svcnm = adminsvc;
0288         mq = mq_exits;
0289         
0290         /*
0291          * At this point we shall create a server command monitoring
0292          * thread!!!
0293          */
0294         NDRX_LOG(log_debug, "About to init admin thread...");
0295         if (EXSUCCEED!=ndrx_svqadmin_init(mq))
0296         {
0297             NDRX_LOG(log_error, "Failed to init admin queue");
0298             userlog("Failed to init admin queue");
0299             EXFAIL_OUT(ret);
0300         }
0301         
0302     }
0303     else if (ATMI_SRV_REPLY_Q==idx)
0304     {
0305         svcnm = replysvc;
0306         mq = mq_exits;
0307     }
0308     else if ((mqd_t)EXFAIL!=mq_exits)
0309     {
0310         mq = mq_exits;
0311     }
0312     else if (NULL==(mq=NDRX_MALLOC(1)))
0313     {
0314         err = errno;
0315         NDRX_LOG(log_error, "Failed to malloc 1 byte: %s", strerror(err));
0316         userlog("Failed to malloc 1 byte: %s", strerror(err));
0317         EXFAIL_OUT(ret);
0318     }
0319     
0320     if (0==strncmp(svcnm, NDRX_SVC_BRIDGE, NDRX_SVC_BRIDGE_STATLEN))
0321     {
0322         NDRX_LOG(log_info, "Accepting any msg");
0323         M_accept_any = EXTRUE;
0324     }
0325     
0326     el->mqd = mq;
0327     el->idx = idx;
0328     
0329     NDRX_STRCPY_SAFE(el->svcnm, svcnm);
0330     
0331     /* register service name into cache... */
0332     EXHASH_ADD_STR( M_svcmap, svcnm, el );
0333     
0334     NDRX_LOG(log_debug, "[%s] mapped to %p idx %d", el->svcnm, el->mqd, el->idx);
0335     
0336 out:
0337     
0338     if (EXSUCCEED!=ret && NULL!=mq)
0339     {
0340         NDRX_FREE((char *)mq);
0341         mq=NULL;
0342     }
0343 
0344     if (EXSUCCEED!=ret && NULL!=el)
0345     {
0346         NDRX_FREE((char *)el);
0347     }
0348 
0349     errno=err;
0350     return mq;
0351 }
0352 
0353 /**
0354  * Ini System V polling thread
0355  * @return EXSUCCEED/EXFAIL, errno set.
0356  */
0357 expublic int ndrx_epoll_sys_init(void)
0358 {
0359     int ret = EXSUCCEED;
0360     
0361     NDRX_SPIN_INIT_V(M_nrfds_lock);
0362     
0363     /* boot the Auxiliary thread */
0364     if (EXSUCCEED!=ndrx_svqshm_init(EXFALSE))
0365     {
0366         NDRX_LOG(log_error, "Failed to init System V Aux thread/SHM");
0367         EXFAIL_OUT(ret);
0368     }
0369     
0370 out:
0371     return ret;
0372 }
0373 
0374 /**
0375  * Nothing to un-init for epoll()
0376  */
0377 expublic void ndrx_epoll_sys_uninit(void)
0378 {
0379     return;
0380 }
0381 
0382 /**
0383  * Return the compiled poll mode
0384  * @return 
0385  */
0386 expublic char * ndrx_epoll_mode(void)
0387 {
0388     static char *mode = "SystemV";
0389     
0390     return mode;
0391 }
0392 
0393 /**
0394  * Wrapper for epoll_ctl, for standard file descriptors
0395  * @param epfd do not care about epfd, we use only one poler
0396  * @param op operation EX_EPOLL_CTL_ADD or EX_EPOLL_CTL_DEL
0397  * @param fd file descriptor to monitor
0398  * @param event not used
0399  * @return EXSUCCEED/EXFAIL
0400  */
0401 expublic int ndrx_epoll_ctl(int epfd, int op, int fd, 
0402         struct ndrx_epoll_event *event)
0403 {
0404     int ret = EXSUCCEED;
0405     int err = 0;
0406     
0407     /* Add or remove FD from the Aux thread */
0408     
0409     switch (op)
0410     {
0411         case EX_EPOLL_CTL_ADD:
0412             if (EXSUCCEED!=(ret = ndrx_svq_moncmd_addfd(M_mainq, fd, event->events)))
0413             {
0414                 err = errno;
0415                 NDRX_LOG(log_error, "Failed to add fd %d to mqd %p for polling: %s",
0416                         fd, M_mainq);
0417             }
0418             /* hmmm add spinlock */
0419             NDRX_SPIN_LOCK_V(M_nrfds_lock);
0420             M_nrfds++;
0421             NDRX_SPIN_UNLOCK_V(M_nrfds_lock);
0422             break;
0423         case EX_EPOLL_CTL_DEL:
0424             if (EXSUCCEED!=(ret = ndrx_svq_moncmd_rmfd(fd)))
0425             {
0426                 err = errno;
0427                 NDRX_LOG(log_error, "Failed to remove fd %d to mqd %p for polling: %s",
0428                         fd, M_mainq);
0429             }
0430             /* add spinlock */
0431             NDRX_SPIN_LOCK_V(M_nrfds_lock);
0432             M_nrfds--;
0433             NDRX_SPIN_UNLOCK_V(M_nrfds_lock);
0434             break;    
0435         default:
0436             NDRX_LOG(log_warn, "Unsupported operation: %d", op);
0437             errno=EINVAL;
0438             EXFAIL_OUT(ret);
0439             break;
0440     }
0441 out:
0442     errno = err;
0443     return ret;
0444 }
0445 
0446 /**
0447  * epoll_ctl for Posix queue descriptors
0448  * @param epfd not used
0449  * @param op operation EX_EPOLL_CTL_DEL
0450  * @param fd queue descriptor
0451  * @param event not used...
0452  * @return EXSUCCEED/EXFAIL
0453  */
0454 expublic int ndrx_epoll_ctl_mq(int epfd, int op, mqd_t fd, 
0455         struct ndrx_epoll_event *event)
0456 {
0457     int ret = EXSUCCEED;
0458     ndrx_svq_pollsvc_t *el, *elt;
0459     
0460     /* just remove service from dispatching... 
0461      * for adding it is done by service queue open function
0462      * ndrx_epoll_service_add()
0463      */
0464     NDRX_LOG(log_debug, "Op %d on mqd=%p from poller", op, fd);
0465     if (EX_EPOLL_CTL_DEL==op)
0466     {
0467         EXHASH_ITER(hh, M_svcmap, el, elt)
0468         {
0469             if (el->mqd==fd)
0470             {
0471                 /* if this is admin queue, then terminate admin thread! */
0472                 if (ATMI_SRV_ADMIN_Q==el->idx)
0473                 {
0474                     if (EXSUCCEED!=ndrx_svqadmin_deinit())
0475                     {
0476                         NDRX_LOG(log_error, "Failed to terminate ADMIN thread!");
0477                         EXFAIL_OUT(ret);
0478                     }
0479                 }
0480                 
0481                 EXHASH_DEL(M_svcmap, el);
0482                 /* if this this is service Q (it is virtual svc q) */
0483                 if (!ndrx_epoll_shallopenq(el->idx))
0484                 {
0485                     NDRX_LOG(log_info, "Free up virtual mqd %p", el->mqd);
0486                     NDRX_FREE((char *)el->mqd);
0487                 }
0488                 NDRX_FREE(el);
0489             }
0490         }
0491     }
0492     
0493 out:
0494     return ret;
0495 }
0496 
0497 /**
0498  * Wrapper for epoll_create
0499  * @param size
0500  * @return Fake FD(1) or EXFAIL
0501  */
0502 expublic int ndrx_epoll_create(int size)
0503 {
0504     int ret = 1;
0505     int err;
0506     struct mq_attr attr;
0507     
0508     memset(&attr, 0, sizeof(attr));
0509     
0510     /* at this point we will open the M_mainq 
0511      * by using th rq addr provided..
0512      */
0513     M_mainq = ndrx_mq_open(M_mainqstr, O_CREAT, 0660,  &attr);
0514     
0515     if ((mqd_t)EXFAIL==M_mainq)
0516     {
0517         err = errno;
0518         NDRX_LOG(log_error, "Failed to open main System V poller queue!");
0519         EXFAIL_OUT(ret);
0520     }
0521     
0522     /* Set RQADDR flag for the queue */    
0523     if (EXSUCCEED!=ndrx_svqshm_ctl(NULL, M_mainq->qid, IPC_SET, 
0524             NDRX_SVQ_MAP_RQADDR, NULL))
0525     {
0526         NDRX_LOG(log_error, "Failed to mark qid %d as request address type", 
0527                 M_mainq->qid);
0528         EXFAIL_OUT(ret);
0529     }
0530     
0531     
0532 out:
0533     return ret;
0534 }
0535 
0536 /**
0537  * Close Epoll set.
0538  */
0539 expublic int ndrx_epoll_close(int fd)
0540 {
0541     ndrx_svq_pollsvc_t *el, *elt;
0542     /* Close main poller Q erase mapping hashes 
0543     ndrx_mq_close(M_mainq);
0544      * THIS WILL BE DONE BY atexit()...
0545      * */
0546     
0547     EXHASH_ITER(hh, M_svcmap, el, elt)
0548     {
0549         EXHASH_DEL(M_svcmap, el);
0550         NDRX_FREE(el);
0551     }
0552     
0553     return EXSUCCEED;
0554 }
0555 
0556 /**
0557  * Wrapper for epoll_wait
0558  * Needs to provide back the identifier that we got msg for admin Q
0559  * or for admin we could use special service name like @ADMIN so that
0560  * we can find it in standard hash list? but the MQD needs to be kind of special
0561  * one so that we do not remove it by our selves at the fake
0562  * @param epfd not used, poll set
0563  * @param events events return events struct 
0564  * @param maxevents max number of events can be loaded
0565  * @param timeout timeout in milliseconds
0566  * @param buf double ptr to preloaded message
0567  * @param buf_len preloaded buffer size on input max size, on output actual msgs sz
0568  * @return 0 - timeout, -1 - FAIL, 1 - have one event
0569  */
0570 expublic int ndrx_epoll_wait(int epfd, struct ndrx_epoll_event *events, 
0571         int maxevents, int timeout, char **buf, int *buf_len)
0572 {
0573     int ret = EXSUCCEED;
0574     ssize_t rcvlen = *buf_len;
0575     ndrx_svq_ev_t *ev = NULL;
0576     int err = 0;
0577     int gottout = EXFALSE;
0578     ndrx_svq_pollsvc_t *svc;
0579     tp_command_call_t *call;
0580     tp_command_generic_t *gen_command;
0581     struct timespec tm;
0582     
0583     /* set thread handler - for interrupts */
0584     M_mainq->thread = pthread_self();
0585     
0586     /* as we do not have non timed receive available for  */
0587     
0588     if (-1==timeout)
0589     {
0590         /* no timeout required */
0591         tm.tv_sec = 0;
0592         tm.tv_nsec = 0;
0593     }
0594     else
0595     {
0596         clock_gettime(CLOCK_REALTIME, &tm);
0597         tm.tv_sec += (timeout / 1000);  /* Set timeout, passed in msec, uses as sec */
0598     }
0599     
0600     if (EXFAIL==ndrx_svq_event_sndrcv( M_mainq, *buf, &rcvlen, 
0601             &tm, &ev, EXFALSE, M_nrfds))
0602     {
0603         err = errno;
0604         if (NULL!=ev)
0605         {
0606             switch (ev->ev)
0607             {
0608                 case NDRX_SVQ_EV_TOUT:
0609                     NDRX_LOG(log_debug, "Timed out");
0610                     gottout = EXTRUE;
0611                     break;
0612                 case NDRX_SVQ_EV_DATA:
0613                     
0614                     /* Admin thread sends something to us... */
0615                     NDRX_LOG(log_info, "Admin queue sends us something "
0616                             "bytes %d", (int)ev->datalen);
0617                     
0618                     events[0].is_mqd = EXTRUE;
0619                     
0620                     /* Copy off the data - todo think about zero copy...*/
0621                     if (*buf_len < ev->datalen)
0622                     {
0623                         NDRX_LOG(log_error, "Receive from FD %d bytes, but max buffer %d",
0624                                 ev->datalen, *buf_len);
0625                         userlog("Receive from FD %d bytes, but max buffer %d",
0626                                 ev->datalen, *buf_len);
0627 
0628                         err=EMSGSIZE;
0629                         EXFAIL_OUT(ret);
0630                     }
0631 
0632                     *buf_len = ev->datalen;
0633                     
0634                     /* free up the buffer... of parent.. */
0635                     NDRX_SYSBUF_FREE(*buf);
0636                     
0637                     *buf = ev->data;
0638                     ev->data = NULL;
0639                     /* memcpy(buf, ev->data, *buf_len); - avoid copy ... */
0640                     
0641                     /* free up the event block? already done at exit... */
0642                     
0643                     /* Lookup admin Queue ID */
0644                     if (NULL==(svc=ndrx_epoll_getsvc(NDRX_SVC_ADMIN)))
0645                     {
0646                         err=EFAULT;
0647                         NDRX_LOG(log_error, "Missing admin [%s] queue def!",
0648                                 NDRX_SVC_ADMIN);
0649                         EXFAIL_OUT(ret);
0650                     }
0651                     
0652                     events[0].data.mqd = svc->mqd;
0653                     break;
0654                 case NDRX_SVQ_EV_FD:
0655                     NDRX_LOG(log_info, "File descriptor %d sends us something "
0656                             "revents=%d bytes %d", ev->fd, (int)ev->revents, 
0657                             (int)ev->datalen);
0658                     events[0].is_mqd = EXFALSE;
0659                     events[0].events = ev->revents;
0660                     events[0].data.fd = ev->fd;
0661 
0662                     /* Copy off the data - todo think about zero copy...*/
0663                     if (*buf_len < ev->datalen)
0664                     {
0665                         NDRX_LOG(log_error, "Receive from FD %d bytes, but max buffer %d",
0666                                 ev->datalen, *buf_len);
0667                         userlog("Receive from FD %d bytes, but max buffer %d",
0668                                 ev->datalen, *buf_len);
0669 
0670                         err=EMSGSIZE;
0671                         EXFAIL_OUT(ret);
0672                     }
0673 
0674                     if (ev->datalen > 0)
0675                     {
0676                         *buf_len = ev->datalen;
0677                         /* memcpy(buf, ev->data, *buf_len); - avoid copy */
0678                         NDRX_SYSBUF_FREE(*buf);
0679                         *buf = ev->data;
0680                         ev->data=NULL;
0681                     }
0682                     break;
0683                     
0684                 default:
0685                     NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0686                     err = EBADF;
0687                     break;
0688             }
0689         }
0690         else
0691         {
0692             /* translate the error codes */
0693             if (ENOMSG==err)
0694             {
0695                 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", M_mainq->qid, 
0696                     strerror(err));
0697                 err = EAGAIN;
0698             }
0699             else
0700             {
0701                 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", M_mainq->qid, 
0702                     strerror(err));
0703             }
0704             EXFAIL_OUT(ret);
0705         }
0706     }
0707     else
0708     {
0709         gen_command = (tp_command_generic_t *)*buf;
0710         
0711         /* we got a message! */
0712         NDRX_LOG(log_debug, "Got message from main SysV queue %d "
0713                 "bytes gencommand: %hd", rcvlen, gen_command->command_id);
0714         
0715         switch (gen_command->command_id)
0716         {
0717             case ATMI_COMMAND_CONNECT:
0718             case ATMI_COMMAND_TPCALL:
0719             case ATMI_COMMAND_CONNRPLY:
0720             case ATMI_COMMAND_TPREPLY:
0721                 
0722                 call = (tp_command_call_t *)*buf;
0723                 
0724                 NDRX_LOG(log_debug, "Lookup service: [%s]", call->name);
0725                 if (NULL==(svc=ndrx_epoll_getsvc(call->name)))
0726                 {
0727                     err=EAGAIN;
0728                     NDRX_DUMP(log_error, "!!! Missing queue def - dumpg", *buf, rcvlen);
0729                     NDRX_LOG(log_error, "!!! Missing queue def for [%s] data "
0730                             "len %d- dropping "
0731                             "msg - is all servers on RQADDR serving all services?",
0732                             call->name, rcvlen);
0733                     userlog("!!! Missing queue def for [%s] "
0734                             "data len %d - dropping msg - is "
0735                             "all servers on RQADDR serving all services?",
0736                             call->name, rcvlen);
0737                     EXFAIL_OUT(ret);
0738                 }
0739                 events[0].data.mqd = svc->mqd;
0740                 break;
0741             default:
0742                 /* any non admin Q will be fine here!*/
0743                 if (NULL==(svc=ndrx_epoll_getfirstna()))
0744                 {
0745                     err=EFAULT;
0746                     NDRX_LOG(log_error, "Cannot find any non admin Q for event");
0747                     userlog("Cannot find any non admin Q for event");
0748                     EXFAIL_OUT(ret);
0749                 }
0750                 
0751                 events[0].data.mqd = svc->mqd;
0752                 
0753                 break;
0754         }
0755         
0756         events[0].is_mqd = EXTRUE;
0757         
0758         NDRX_LOG(log_debug, "event mapped to %p (mqd subst)",
0759                 events[0].data.mqd);
0760         *buf_len = rcvlen;
0761     }
0762     
0763 out:
0764             
0765     if (NULL!=ev)
0766     {
0767         if (NULL!=ev->data)
0768         {
0769             NDRX_SYSBUF_FREE(ev->data);
0770         }
0771         
0772         NDRX_FPFREE(ev);
0773     }
0774     
0775     if (EXSUCCEED==ret)
0776     {
0777         if (gottout)
0778         {
0779             errno = EAGAIN;
0780             return 0; /* got timeout */
0781         }
0782         else
0783         {
0784             return 1;   /* received something useful */
0785         }
0786     }
0787     else
0788     {
0789         errno = err;
0790         return EXFAIL;
0791     }
0792 }
0793 
0794 /**
0795  * Return errno for ndrx_poll() operation
0796  * @return 
0797  */
0798 expublic int ndrx_epoll_errno(void)
0799 {
0800     return errno;
0801 }
0802 
0803 /**
0804  * Wrapper for strerror
0805  * @param err
0806  * @return 
0807  */
0808 expublic char * ndrx_poll_strerror(int err)
0809 {
0810     return strerror(err);
0811 }
0812 
0813 /* vim: set ts=4 sw=4 et smartindent: */