Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief SystemV Queue AIX Poll
0003  *
0004  * @file sys_svapoll.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 
0037 #include <ndrx_config.h>
0038 
0039 #ifdef EX_OS_AIX
0040 /* This is for aix to active extended poll */
0041 #define _MSGQSUPPORT 1
0042 #endif
0043 
0044 #include <stdio.h>
0045 #include <stdlib.h>
0046 #include <time.h>
0047 
0048 #include <unistd.h>
0049 #include <stdarg.h>
0050 #include <ctype.h>
0051 #include <memory.h>
0052 #include <errno.h>
0053 #include <signal.h>
0054 #include <limits.h>
0055 #include <pthread.h>
0056 #include <string.h>
0057 #include <poll.h>
0058 #include <fcntl.h>
0059 #include <sys/select.h>
0060 
0061 #include <sys/time.h>                   /* purely for dbg_timer()       */
0062 #include <sys/stat.h>
0063 #include <ndrstandard.h>
0064 #include <ndebug.h>
0065 #include <nstdutil.h>
0066 #include <limits.h>
0067 #include <sys_unix.h>
0068 
0069 #include <exhash.h>
0070 
0071 #include "nstd_tls.h"
0072 
0073 /*---------------------------Externs------------------------------------*/
0074 /*---------------------------Macros-------------------------------------*/
0075 
0076 /*
0077  * Special ops for message queue...
0078  */
0079 #define EXHASH_FIND_MQD(head,findptr,out)                                          \
0080     EXHASH_FIND(hh,head,findptr,sizeof(mqd_t),out)
0081 
0082 #define EXHASH_ADD_MQD(head,ptrfield,add)                                          \
0083     EXHASH_ADD(hh,head,ptrfield,sizeof(mqd_t),add)
0084 
0085 /**
0086  * Find System V Queue ID
0087  */
0088 #define EXHASH_FIND_QID(head,findptr,out)                                          \
0089     EXHASH_FIND(hh_qid,head,findptr,sizeof(int),out)
0090 
0091 /**
0092  * Add System V queue ID
0093  */
0094 #define EXHASH_ADD_QID(head,ptrfield,add)                                          \
0095     EXHASH_ADD(hh_qid,head,ptrfield,sizeof(int),add)
0096 /**
0097  * System V Qid map delete
0098  */
0099 #define EXHASH_DEL_QID(head,delptr)                                                \
0100     EXHASH_DELETE(hh_qid,head,delptr)
0101 
0102 #define EX_POLL_SETS_MAX            1024
0103 
0104 #define EX_EPOLL_API_ENTRY      {NSTD_TLS_ENTRY; \
0105             G_nstd_tls->M_last_err = 0; \
0106             G_nstd_tls->M_last_err_msg[0] = EXEOS;}
0107 
0108 #define ERROR_BUFFER            1024
0109 
0110 
0111 /** get file descriptor pos  */
0112 #define NDRX_PFD_GET(set, i) ((struct ndrx_pollfd *)((char *)set->polltab + i*sizeof(struct ndrx_pollfd)))
0113 
0114 /** get mqueue descr pos */
0115 #define NDRX_PMQ_GET(set, i) ((struct ndrx_pollmsg *)((char *)set->polltab + \
0116     sizeof(struct ndrx_pollfd)*set->nrfds + i*sizeof(struct ndrx_pollmsg)))
0117 
0118 
0119 
0120 
0121 /*
0122 #define NDRX_POLLEXCL_DEBUG
0123 */
0124 
0125 
0126 /*---------------------------Enums--------------------------------------*/
0127 /*---------------------------Typedefs-----------------------------------*/
0128 
0129 /**
0130  * Hash list of File descriptors we monitor..
0131  */
0132 struct ndrx_epoll_fds
0133 {
0134     int fd;
0135     struct ndrx_epoll_event event;  /**< monitored events               */
0136     EX_hash_handle hh;              /**< makes this structure hashable  */
0137 };
0138 typedef struct ndrx_epoll_fds ndrx_epoll_fds_t;
0139 
0140 /**
0141  * hash list of queue descriptor
0142  */
0143 struct ndrx_epoll_mqds
0144 {
0145     mqd_t mqd;  /**< Enduro/X internal queue handler                    */
0146     int qid;    /**< System V queue id                                  */
0147     struct ndrx_epoll_event event;  /**< monitored events               */
0148     EX_hash_handle hh;              /**< makes this structure hashable  */
0149     EX_hash_handle hh_qid;          /**< QID Hash for qid->mqd map      */
0150 };
0151 typedef struct ndrx_epoll_mqds ndrx_epoll_mqds_t;
0152 
0153 /**
0154  * Our internal 'epoll' set
0155  */
0156 struct ndrx_epoll_set
0157 {
0158     int fd;
0159     
0160     ndrx_epoll_fds_t *fds;  /**< hash of file descriptors for monitoring    */
0161     ndrx_epoll_mqds_t *mqds;/**< hash of message queues for monitoring      */
0162     ndrx_epoll_mqds_t *mqds_qid;/**< hash of message queues for monitoring  */
0163     
0164     int nrfds;              /**< number of FDs in poll                      */
0165     int nrfmqds;            /** Number of queues polled                     */
0166     void *polltab;           /**< poll() structure, pre-allocated           */
0167     
0168     EX_hash_handle hh;      /**< makes this structure hashable              */
0169 };
0170 typedef struct ndrx_epoll_set ndrx_epoll_set_t;
0171 
0172 exprivate ndrx_epoll_set_t *M_psets = NULL; /* poll sets  */
0173 exprivate MUTEX_LOCKDECL(M_psets_lock);
0174 /*---------------------------Globals------------------------------------*/
0175 /*---------------------------Statics------------------------------------*/
0176 /*---------------------------Prototypes---------------------------------*/
0177 exprivate ndrx_epoll_mqds_t* mqd_find(ndrx_epoll_set_t *pset, mqd_t mqd);
0178 
0179 /**
0180  * Return the compiled poll mode
0181  * @return 
0182  */
0183 expublic char * ndrx_epoll_mode(void)
0184 {
0185     static char *mode = "svapoll";
0186     return mode;
0187 }
0188 
0189 /**
0190  * Nothing to do
0191  * @return EXSUCCEED
0192  */
0193 expublic int ndrx_epoll_sys_init(void)
0194 {    
0195     return EXSUCCEED;
0196 }
0197 
0198 /**
0199  * Un-initialize polling lib
0200  * @return
0201  */
0202 expublic void ndrx_epoll_sys_uninit(void)
0203 {
0204     /* nothing todo */
0205 }
0206 
0207 /**
0208  * We basically will use unix error codes
0209  */
0210 exprivate void ndrx_epoll_set_err(int error_code, const char *fmt, ...)
0211 {
0212     char msg[ERROR_BUFFER+1] = {EXEOS};
0213     va_list ap;
0214     
0215     NSTD_TLS_ENTRY;
0216 
0217     va_start(ap, fmt);
0218     (void) vsnprintf(msg, sizeof(msg), fmt, ap);
0219     va_end(ap);
0220 
0221     NDRX_STRCPY_SAFE(G_nstd_tls->M_last_err_msg, msg);
0222     G_nstd_tls->M_last_err = error_code;
0223 
0224     NDRX_LOG(log_warn, "ndrx_epoll_set_err: %d (%s) (%s)",
0225                     error_code, strerror(G_nstd_tls->M_last_err), 
0226                     G_nstd_tls->M_last_err_msg);
0227     
0228 }
0229 
0230 /**
0231  * Find FD in hash
0232  */
0233 exprivate ndrx_epoll_fds_t* fd_find(ndrx_epoll_set_t *pset, int fd)
0234 {
0235     ndrx_epoll_fds_t*ret = NULL;
0236     
0237     EXHASH_FIND_INT( pset->fds, &fd, ret);
0238     
0239     return ret;
0240 }
0241 
0242 /**
0243  * FInd q descriptor in hash
0244  */
0245 exprivate ndrx_epoll_mqds_t* mqd_find(ndrx_epoll_set_t *pset, mqd_t mqd)
0246 {
0247     ndrx_epoll_mqds_t*ret = NULL;
0248     
0249     EXHASH_FIND_MQD( pset->mqds, &mqd, ret);
0250     
0251     return ret;
0252 }
0253 
0254 /**
0255  * Find polling set
0256  * @param epfd epoll set
0257  * @param dolock if set to !=0 f
0258  */
0259 exprivate ndrx_epoll_set_t* pset_find(int epfd, int dolock)
0260 {
0261     ndrx_epoll_set_t *ret = NULL;
0262     
0263     if (dolock)
0264     {
0265         MUTEX_LOCK_V(M_psets_lock);
0266     }
0267     
0268     EXHASH_FIND_INT( M_psets, &epfd, ret);
0269     
0270     if (dolock)
0271     {
0272         MUTEX_UNLOCK_V(M_psets_lock);
0273     }
0274     
0275     return ret;
0276 }
0277 
0278 /**
0279  * This synchronizes file descriptors and queues to polltab
0280  * @param set to update
0281  */
0282 expublic int ndrx_polltab_sync(ndrx_epoll_set_t* set)
0283 {
0284     int ret = EXSUCCEED;
0285     size_t sz;
0286     int i;
0287     ndrx_epoll_fds_t *fel, *felt;
0288     ndrx_epoll_mqds_t *mel, *melt;
0289     
0290     /* free up previous image */
0291     if (NULL!=set->polltab)
0292     {
0293         NDRX_FREE(set->polltab);
0294     }
0295     
0296     if (set->nrfds + set->nrfmqds == 0)
0297     {
0298         /* nothing to do */
0299         goto out;
0300     }
0301     
0302     sz = sizeof(struct ndrx_pollfd) * set->nrfds + sizeof(struct ndrx_pollmsg) * set->nrfmqds;
0303     
0304     if (NULL==(set->polltab = NDRX_MALLOC(sz)))
0305     {
0306         int err = errno;
0307         ndrx_epoll_set_err(errno, "Failed to alloc new poll tab: %zu bytes: %s ", 
0308                 sz, strerror(err));
0309         NDRX_LOG(log_error, "Failed to alloc new poll tab: %zu bytes: %s ", 
0310                 sz, strerror(err));
0311         EXFAIL_OUT(ret);
0312     }
0313     
0314     /* update records... first comes the fds */
0315     i=0;
0316     EXHASH_ITER(hh, set->fds, fel, felt)
0317     {
0318         struct ndrx_pollfd * pfd = NDRX_PFD_GET(set, i);
0319         
0320         pfd->events=fel->event.events;
0321         pfd->fd=fel->fd;
0322         
0323         i++;
0324     }
0325     
0326     i=0;
0327     EXHASH_ITER(hh, set->mqds, mel, melt)
0328     {
0329         struct ndrx_pollmsg * mfd = NDRX_PMQ_GET(set, i);
0330         
0331         mfd->reqevents=mel->event.events;
0332         mfd->msgid=mel->mqd->qid;
0333         i++;
0334     }
0335     
0336 out:
0337     return ret;
0338 }
0339 
0340 /**
0341  * Poll ctl for file descriptors
0342  * @param epfd poll set
0343  * @param op operation EX_EPOLL_CTL_ADD or EX_EPOLL_CTL_DEL
0344  * @param fd file descriptor
0345  * @param event events to use
0346  * @return EXSUCCED/EXFAIL (error set)
0347  */
0348 expublic int ndrx_epoll_ctl(int epfd, int op, int fd, struct ndrx_epoll_event *event)
0349 {
0350     int ret = EXSUCCEED;
0351     ndrx_epoll_set_t* set = NULL;
0352     ndrx_epoll_fds_t * tmp = NULL;
0353             
0354     EX_EPOLL_API_ENTRY;
0355     
0356     if (NULL==(set = pset_find(epfd, EXTRUE)))
0357     {
0358         NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0359         ndrx_epoll_set_err(ENOSYS, "ndrx_epoll set %d not found", epfd);
0360         EXFAIL_OUT(ret);
0361     }
0362     
0363     if (EX_EPOLL_CTL_ADD == op)
0364     {
0365         NDRX_LOG(log_info, "%s: Add operation on ndrx_epoll set %d, fd %d", 
0366                 __func__, epfd, fd);
0367         
0368         /* test & add to FD hash */
0369         if (NULL!=fd_find(set, fd))
0370         {
0371             ndrx_epoll_set_err(EINVAL, "fd %d already exists in ndrx_epoll set (epfd %d)", 
0372                     fd, set->fd);
0373             NDRX_LOG(log_error, "fd %d already exists in ndrx_epoll set (epfd %d)", 
0374                  fd, set->fd);
0375             EXFAIL_OUT(ret);
0376         }
0377         
0378         if (NULL==(tmp = NDRX_CALLOC(1, sizeof(*tmp))))
0379         {
0380             ndrx_epoll_set_err(errno, "Failed to alloc FD hash entry");
0381             NDRX_LOG(log_error, "Failed to alloc FD hash entry");
0382             EXFAIL_OUT(ret);
0383         }
0384         
0385         tmp->fd = fd;
0386         tmp->event = *event;
0387         EXHASH_ADD_INT(set->fds, fd, tmp);
0388         
0389         /* resize/realloc events list, add fd */
0390         set->nrfds++;
0391         
0392         NDRX_LOG(log_info, "set nrfds incremented to %d", set->nrfds);
0393         
0394         /* Rebuild mem block.. */
0395         if (EXSUCCEED!=ndrx_polltab_sync(set))
0396         {
0397             EXFAIL_OUT(ret);
0398         }
0399     }
0400     else if (EX_EPOLL_CTL_DEL == op)
0401     {
0402         NDRX_LOG(log_info, "%s: Delete operation on ndrx_epoll set %d, fd %d", 
0403                 __func__, epfd, fd);
0404         
0405         /* test & add to FD hash */
0406         if (NULL==(tmp=fd_find(set, fd)))
0407         {
0408             ndrx_epoll_set_err(EINVAL, "fd %d not found in ndrx_epoll set (epfd %d)", 
0409                     fd, set->fd);
0410             NDRX_LOG(log_error, "fd %d not found in ndrx_epoll set (epfd %d)", 
0411                     fd, set->fd);
0412             EXFAIL_OUT(ret);
0413         }
0414         
0415         /* Remove fd from set->restab & from hash */
0416         
0417         EXHASH_DEL(set->fds, tmp);
0418         NDRX_FREE((char *)tmp);
0419         
0420         set->nrfds--;
0421         
0422         /* Rebuild mem block.. */
0423         if (EXSUCCEED!=ndrx_polltab_sync(set))
0424         {
0425             EXFAIL_OUT(ret);
0426         }
0427         
0428     }
0429     else
0430     {
0431         ndrx_epoll_set_err(EINVAL, "Invalid operation %d", op);
0432         NDRX_LOG(log_error, "Invalid operation %d", op);
0433         
0434         EXFAIL_OUT(ret);
0435     }
0436 
0437 out:
0438 
0439     NDRX_LOG(log_info, "return %d", ret);
0440 
0441     return ret;
0442 }
0443 
0444 /**
0445  * epoll_ctl for Posix queue descriptors
0446  * @param epfd poll set
0447  * @param op operation
0448  * @param mqd queue descriptor
0449  * @param event poll events to monitor
0450  * @return EXSUCCEED/EXFAIL
0451  */
0452 expublic int ndrx_epoll_ctl_mq(int epfd, int op, mqd_t mqd, struct ndrx_epoll_event *event)
0453 {
0454     int ret = EXSUCCEED;
0455     ndrx_epoll_set_t* set = NULL;
0456     ndrx_epoll_mqds_t * tmp = NULL;
0457     static int first = EXTRUE;
0458     static int use_excl = EXFALSE;
0459     EX_EPOLL_API_ENTRY;
0460     
0461     if (NULL==(set = pset_find(epfd, EXTRUE)))
0462     {
0463         ndrx_epoll_set_err(ENOENT, "ndrx_epoll set %d not found", epfd);
0464         NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0465         
0466         EXFAIL_OUT(ret);
0467     }
0468     
0469     /* no problem with MT...
0470      * several settings to 1 at the same time, result would be the same anyway
0471      */
0472     if (first)
0473     {
0474         if (NULL==getenv(CONF_NDRX_NOPOLLEXCL))
0475         {
0476             use_excl=EXTRUE;
0477         }
0478         first=EXFALSE;
0479     }
0480     
0481     if (EX_EPOLL_CTL_ADD == op)
0482     {
0483         NDRX_LOG(log_info, "%s: Add operation on ndrx_epoll set %d, fd %d", 
0484                 __func__, epfd, mqd);
0485         
0486         /* test & add to FD hash */
0487         if (NULL!=mqd_find(set, mqd))
0488         {
0489             ndrx_epoll_set_err(EINVAL, "fd %d already exists in ndrx_epoll set (epfd %d)", 
0490                     mqd, set->fd);
0491             NDRX_LOG(log_error, "fd %d already exists in ndrx_epoll set (epfd %d)", 
0492                     mqd, set->fd);
0493             EXFAIL_OUT(ret);
0494         }
0495         
0496         if (NULL==(tmp = NDRX_CALLOC(1, sizeof(*tmp))))
0497         {
0498             ndrx_epoll_set_err(errno, "Failed to alloc FD hash entry");
0499             NDRX_LOG(log_error, "Failed to alloc FD hash entry");
0500             
0501             EXFAIL_OUT(ret);
0502         }
0503         
0504         tmp->mqd = mqd;
0505         tmp->qid = mqd->qid;
0506         tmp->event = *event;
0507         
0508 #ifdef POLLEXCL
0509         /* avoid thundering herd issue, new feature in AIX 7.3 */
0510         if (use_excl)
0511         {
0512             tmp->event.events|=POLLEXCL;
0513         }
0514 #endif
0515         
0516         EXHASH_ADD_MQD(set->mqds, mqd, tmp);
0517         
0518         /* add QID */
0519         EXHASH_ADD_QID(set->mqds_qid, qid, tmp);
0520         
0521         /* resize/realloc events list, add fd */
0522         set->nrfmqds++;
0523         
0524         NDRX_LOG(log_info, "set nrfmqds incremented to %d", set->nrfmqds);
0525         
0526         /* Rebuild mem block.. */
0527         if (EXSUCCEED!=ndrx_polltab_sync(set))
0528         {
0529             EXFAIL_OUT(ret);
0530         }
0531 
0532     }
0533     else if (EX_EPOLL_CTL_DEL == op)
0534     {
0535         NDRX_LOG(log_info, "%s: Delete operation on ndrx_epoll set %d, fd %d", 
0536                 __func__, epfd, mqd);
0537         
0538         /* test & add to FD hash */
0539         if (NULL==(tmp=mqd_find(set, mqd)))
0540         {
0541             ndrx_epoll_set_err(EINVAL, "fd %d not found in ndrx_epoll set (epfd %d)", 
0542                     mqd, set->fd);
0543             
0544             NDRX_LOG(log_error, "fd %d not found in ndrx_epoll set (epfd %d)", 
0545                     mqd, set->fd);
0546             
0547             EXFAIL_OUT(ret);
0548         }
0549         
0550         /* Remove fd from set->restab & from hash */
0551         EXHASH_DEL(set->mqds, tmp);
0552         EXHASH_DEL_QID(set->mqds_qid, tmp);
0553         NDRX_FREE((char *)tmp);
0554         
0555         /* resize/realloc events list, add fd */
0556         set->nrfmqds--;
0557         
0558         NDRX_LOG(log_info, "set nrfmqds decrement to %d", set->nrfmqds);
0559         
0560         /* Rebuild mem block.. */
0561         if (EXSUCCEED!=ndrx_polltab_sync(set))
0562         {
0563             EXFAIL_OUT(ret);
0564         }
0565         
0566     }
0567     else
0568     {
0569         ndrx_epoll_set_err(EINVAL, "Invalid operation %d", op);
0570         
0571         NDRX_LOG(log_error, "Invalid operation %d", op);
0572         
0573         EXFAIL_OUT(ret);
0574     }
0575 
0576 out:
0577 
0578     
0579 
0580     NDRX_LOG(log_info, "return %d", ret);
0581 
0582     return ret;
0583 }
0584 
0585 /**
0586  * Wrapper for epoll_create
0587  * @param size
0588  * @return 
0589  */
0590 expublic int ndrx_epoll_create(int size)
0591 {
0592     int ret = EXSUCCEED;
0593     int i = 1;
0594     ndrx_epoll_set_t *set;
0595     
0596     EX_EPOLL_API_ENTRY;
0597     
0598     MUTEX_LOCK_V(M_psets_lock);
0599     
0600     while (NULL!=(set=pset_find(i, EXFALSE)) && i < EX_POLL_SETS_MAX)
0601     {
0602         i++;
0603     }
0604     
0605     /* we must have free set */
0606     if (NULL!=set)
0607     {
0608         ndrx_epoll_set_err(EMFILE, "Max ndrx_epoll_sets_reached");
0609         NDRX_LOG(log_error, "Max ndrx_epoll_sets_reached");
0610                 
0611         
0612         set = NULL;
0613         EXFAIL_OUT(ret);
0614     }
0615     
0616     NDRX_LOG(log_info, "Creating ndrx_epoll set: %d", i);
0617     
0618     if (NULL==(set = (ndrx_epoll_set_t *)NDRX_CALLOC(1, sizeof(*set))))
0619     {
0620         ndrx_epoll_set_err(errno, "Failed to alloc: %d bytes", sizeof(*set));
0621         
0622         NDRX_LOG(log_error, "Failed to alloc: %d bytes", sizeof(*set));
0623 
0624         EXFAIL_OUT(ret);
0625     }
0626     
0627     /* add finally to hash */
0628     set->fd = i; /* assign the FD num */
0629     EXHASH_ADD_INT(M_psets, fd, set);
0630     NDRX_LOG(log_info, "ndrx_epoll_create succeed, fd=%d", i);
0631     
0632 out:
0633     
0634     MUTEX_UNLOCK_V(M_psets_lock); /*  <<< release the lock */
0635 
0636     if (EXSUCCEED!=ret)
0637     {
0638         if (NULL!=set)
0639         {
0640             NDRX_FREE((char *)set);
0641         }
0642         
0643         return EXFAIL;
0644         
0645     }
0646 
0647     return i;
0648 }
0649 
0650 /**
0651  * Close the epoll set
0652  */
0653 expublic int ndrx_epoll_close(int epfd)
0654 {
0655     int ret = EXSUCCEED;
0656     ndrx_epoll_set_t* set = NULL;
0657     
0658     ndrx_epoll_fds_t* f, *ftmp;
0659     ndrx_epoll_mqds_t* m, *mtmp;
0660     
0661     NDRX_LOG(log_debug, "ndrx_epoll_close(%d) enter", epfd);
0662     
0663     if (NULL==(set = pset_find(epfd, EXTRUE)))
0664     {
0665         MUTEX_UNLOCK_V(M_psets_lock); /*  <<< release the lock */
0666         
0667         ndrx_epoll_set_err(EINVAL, "ndrx_epoll set %d not found", epfd);
0668         NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0669         
0670         
0671         EXFAIL_OUT(ret);
0672     }
0673     
0674     /* Kill file descriptor hash */
0675     EXHASH_ITER(hh, set->fds, f, ftmp)
0676     {
0677         ndrx_epoll_ctl(set->fd, EX_EPOLL_CTL_DEL, f->fd, NULL);
0678     }
0679     
0680     /* Kill message queue descriptor hash */
0681     EXHASH_ITER(hh, set->mqds, m, mtmp)
0682     {
0683         ndrx_epoll_ctl_mq(set->fd, EX_EPOLL_CTL_DEL, m->mqd, NULL);
0684     }
0685     
0686     if (NULL!=set->polltab)
0687     {
0688         NDRX_FREE(set->polltab);
0689     }
0690     
0691     MUTEX_LOCK_V(M_psets_lock);
0692     EXHASH_DEL(M_psets, set);
0693     NDRX_FREE(set);
0694     MUTEX_UNLOCK_V(M_psets_lock);
0695     
0696 out:
0697     return EXFAIL;
0698 }
0699 
0700 /**
0701  * Wait for event 
0702  * @param epfd file descriptor
0703  * @param events where to load the events
0704  * @param maxevents number events
0705  * @param timeout how long to wait
0706  * @return EXUSCCEE
0707  */
0708 expublic int ndrx_epoll_wait(int epfd, struct ndrx_epoll_event *events, 
0709         int maxevents, int timeout, char **buf, int *buf_len)
0710 {
0711     int ret = EXSUCCEED;
0712     int numevents = 0;
0713     ndrx_epoll_set_t* set = NULL;
0714     char *fn = "ndrx_epoll_wait";
0715     int i, err=0;
0716     int retpoll;
0717     struct ndrx_pollfd *pfd;
0718     struct ndrx_pollmsg *pmq;
0719     unsigned long nfdmsgs;
0720     int debug_nr=0;
0721     
0722     EX_EPOLL_API_ENTRY;
0723     
0724     /* not returning... */
0725     *buf_len = EXFAIL;
0726     
0727     if (NULL==(set = pset_find(epfd, EXTRUE)))
0728     {
0729         ndrx_epoll_set_err(EINVAL, "ndrx_epoll set %d not found", epfd);
0730         NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0731         EXFAIL_OUT(ret);
0732     }
0733     
0734     nfdmsgs=(set->nrfmqds<<16)|(set->nrfds);
0735     NDRX_LOG(log_debug, "%s: epfd=%d, events=%p, maxevents=%d, timeout=%d - "
0736                     "about to poll(nrfds=%d ndrmqds=%d) polltab=%p nfdmsgs=%lu",
0737                     fn, epfd, events, maxevents, timeout, set->nrfds, set->nrfmqds,
0738                     set->polltab, nfdmsgs);
0739     
0740     for (i=0; i<set->nrfds; i++)
0741     {
0742         NDRX_PFD_GET(set, i)->revents = 0;
0743     }
0744     
0745     for (i=0; i<set->nrfmqds; i++)
0746     {
0747         NDRX_PMQ_GET(set, i)->rtnevents = 0;
0748     }
0749 
0750     /* run the poll finally... */
0751     retpoll = poll( set->polltab, nfdmsgs, timeout);
0752     err=errno;
0753 
0754     if (retpoll<0)
0755     {
0756          NDRX_LOG(log_error, "Poll failure: %s", strerror(err));
0757          goto out;
0758     }
0759     else if (0==retpoll)
0760     {
0761          goto out;
0762     }
0763     
0764     /* return file events.. */
0765     for (i=0; NFDS(retpoll) > 0 && i < set->nrfds && numevents < maxevents; i++)
0766     {
0767         pfd = NDRX_PFD_GET(set, i);
0768         if (pfd->revents)
0769         {
0770             /* fill up the event block */
0771             NDRX_LOG(log_debug, "event no: %d revents: %d fd: %d", 
0772                     numevents, (int)pfd->revents, pfd->fd);
0773             events[numevents].data.fd = pfd->fd;
0774             events[numevents].events = pfd->revents;
0775             events[numevents].is_mqd = EXFALSE;
0776             numevents++;
0777         }
0778     }
0779 
0780 #ifdef NDRX_POLLEXCL_DEBUG
0781     for (i=0; NMSGS(retpoll) > 0 && i < set->nrfmqds; i++)
0782     {
0783         pmq = NDRX_PMQ_GET(set, i);
0784         if (pmq->rtnevents & POLLIN)
0785         {
0786             debug_nr++;
0787         }
0788     }
0789     userlog("debug_nr=%d", debug_nr);
0790 #endif
0791     
0792     for (i=0; NMSGS(retpoll) > 0 && i < set->nrfmqds && numevents < maxevents; i++)
0793     {
0794         pmq = NDRX_PMQ_GET(set, i);
0795         if (pmq->rtnevents)
0796         {
0797             ndrx_epoll_mqds_t *tmqd = NULL;
0798             EXHASH_FIND_QID( set->mqds_qid, &(pmq->msgid), tmqd);
0799             
0800             NDRX_LOG(log_debug, "event no: %d revents: %d mqd: %p (qid %d)", 
0801                     numevents, (int)pmq->rtnevents, tmqd->mqd, pmq->msgid);
0802 
0803             events[numevents].data.mqd = tmqd->mqd;
0804             events[numevents].events = pmq->rtnevents;
0805             events[numevents].is_mqd = EXTRUE;
0806             numevents++;
0807         }
0808     }
0809     
0810 out:
0811 
0812     NDRX_LOG(log_info, "%s ret=%d numevents=%d", fn, ret, numevents);
0813     ndrx_poll_strerror(err);
0814 
0815     if (EXSUCCEED==ret)
0816     {
0817         return numevents;
0818     }
0819     else
0820     {
0821         return EXFAIL;
0822     }
0823 }
0824 
0825 /**
0826  * Return errno for ndrx_poll() operation
0827  * @return 
0828  */
0829 expublic int ndrx_epoll_errno(void)
0830 {
0831     NSTD_TLS_ENTRY;
0832     return G_nstd_tls->M_last_err;
0833 }
0834 
0835 /**
0836  * Wrapper for strerror
0837  * @param err
0838  * @return 
0839  */
0840 expublic char * ndrx_poll_strerror(int err)
0841 {
0842     NSTD_TLS_ENTRY;
0843     
0844     snprintf(G_nstd_tls->poll_strerr, ERROR_BUFFER_POLL, "%s (last error: %s)", 
0845     strerror(err), G_nstd_tls->M_last_err_msg);
0846     
0847     return G_nstd_tls->poll_strerr;
0848 }
0849 
0850 /**
0851  * Translate the service name to queue
0852  * @param[out] send_q output service queue
0853  * @param[in] q_prefix queue prefix
0854  * @param[in] svc service name
0855  * @param[in] resid resource id (in this case it is 
0856  * @return EXSUCCEED/EXFAIL
0857  */
0858 expublic int ndrx_epoll_service_translate(char *send_q, char *q_prefix, 
0859         char *svc, int resid)
0860 {
0861     /* lookup service in SHM! from resid/qid -> queue */
0862     snprintf(send_q, NDRX_MAX_Q_SIZE+1, NDRX_SVC_QFMT_SRVID, q_prefix, 
0863                 svc, resid);
0864     
0865     return EXSUCCEED;
0866 }
0867 
0868 /**
0869  * Not used by poll
0870  * @param svcnm
0871  * @param idx
0872  * @param mq_exits
0873  * @return 
0874  */
0875 expublic mqd_t ndrx_epoll_service_add(char *svcnm, int idx, mqd_t mq_exits)
0876 {
0877     return mq_exits;
0878 }
0879 
0880 /**
0881  * Not used by poll
0882  * @return 
0883  */
0884 expublic int ndrx_epoll_shmdetach(void)
0885 {
0886     /* detach resources */
0887     ndrx_svqshm_detach();
0888     return EXSUCCEED;
0889 }
0890 
0891 /**
0892  * not used by poll
0893  * @param idx
0894  * @return 
0895  */
0896 expublic int ndrx_epoll_shallopenq(int idx)
0897 {
0898     return EXTRUE;
0899 }
0900 /**
0901  * Not used by poll
0902  * @param qstr
0903  */
0904 expublic void ndrx_epoll_mainq_set(char *qstr)
0905 {
0906     return;
0907 }
0908 
0909 /**
0910  * Not used by poll
0911  * @param force
0912  * @return 
0913  */
0914 expublic int ndrx_epoll_down(int force)
0915 {
0916     return ndrx_svqshm_down(force);
0917 }
0918 
0919 /* vim: set ts=4 sw=4 et smartindent: */