0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042 #include <ndrx_config.h>
0043 #include <stdio.h>
0044 #include <stdlib.h>
0045 #include <time.h>
0046
0047 #include <unistd.h>
0048 #include <stdarg.h>
0049 #include <ctype.h>
0050 #include <memory.h>
0051 #include <errno.h>
0052 #include <signal.h>
0053 #include <limits.h>
0054 #include <pthread.h>
0055 #include <string.h>
0056 #include <poll.h>
0057 #include <fcntl.h>
0058
0059 #include <sys/time.h> /* purely for dbg_timer() */
0060 #include <sys/stat.h>
0061 #include <ndrstandard.h>
0062 #include <ndebug.h>
0063 #include <nstdutil.h>
0064 #include <limits.h>
0065 #include <sys_unix.h>
0066
0067 #include <exhash.h>
0068
0069 #include "nstd_tls.h"
0070
0071
0072
0073
0074
0075 #define EX_POLL_SIGNALLED
0076
0077
0078
0079 #define EXHASH_FIND_MQD(head,findptr,out) \
0080 EXHASH_FIND(hh,head,findptr,sizeof(mqd_t),out)
0081
0082 #define EXHASH_ADD_MQD(head,ptrfield,add) \
0083 EXHASH_ADD(hh,head,ptrfield,sizeof(mqd_t),add)
0084
0085
0086 #define EX_POLL_SETS_MAX 1024
0087
0088
0089 #define EX_EPOLL_API_ENTRY {NSTD_TLS_ENTRY; \
0090 G_nstd_tls->M_last_err = 0; \
0091 G_nstd_tls->M_last_err_msg[0] = EXEOS;}
0092
0093
0094 #define READ 0
0095
0096
0097 #define WRITE 1
0098
0099
0100 #define PIPE_POLL_IDX 0
0101
0102 #define ERROR_BUFFER 1024
0103
0104
0105 #define NOTIFY_SIG SIGUSR2
0106
0107
0108
0109
0110
0111
0112
0113 struct ndrx_epoll_fds
0114 {
0115 int fd;
0116 EX_hash_handle hh;
0117 };
0118 typedef struct ndrx_epoll_fds ndrx_epoll_fds_t;
0119
0120
0121
0122
0123 struct ndrx_epoll_mqds
0124 {
0125 mqd_t mqd;
0126 struct sigevent sev;
0127 EX_hash_handle hh;
0128 };
0129 typedef struct ndrx_epoll_mqds ndrx_epoll_mqds_t;
0130
0131
0132
0133
0134 struct ndrx_epoll_set
0135 {
0136 int fd;
0137
0138 ndrx_epoll_fds_t *fds;
0139 ndrx_epoll_mqds_t *mqds;
0140
0141
0142 int nrfds;
0143 struct pollfd *fdtab;
0144
0145 int nrfmqds;
0146
0147
0148 int wakeup_pipe[2];
0149
0150 EX_hash_handle hh;
0151 };
0152 typedef struct ndrx_epoll_set ndrx_epoll_set_t;
0153
0154 typedef struct ndrx_pipe_mqd_hash ndrx_pipe_mqd_hash_t;
0155
0156 exprivate ndrx_epoll_set_t *M_psets = NULL;
0157 exprivate ndrx_pipe_mqd_hash_t *M_pipe_h = NULL;
0158
0159 exprivate int M_shutdown = EXFALSE;
0160
0161 exprivate MUTEX_LOCKDECL(M_psets_lock);
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173 exprivate pthread_t M_signal_thread;
0174 exprivate int M_signal_first = EXTRUE;
0175
0176
0177 exprivate ndrx_epoll_mqds_t* mqd_find(ndrx_epoll_set_t *pset, mqd_t mqd);
0178 exprivate int signal_install_notifications_all(ndrx_epoll_set_t *s);
0179 exprivate void slipSigHandler (int sig);
0180 exprivate int signal_handle_event(void);
0181
0182
0183
0184
0185
0186 exprivate void cleanup_handler(void *arg)
0187 {
0188 ndrx_nstd_tls_free(G_nstd_tls);
0189 }
0190
0191
0192 exprivate void *sigthread_enter(void *arg)
0193 {
0194 NDRX_LOG(log_error, "***********SIGNAL THREAD START***********");
0195 signal_handle_event();
0196 NDRX_LOG(log_error, "***********SIGNAL THREAD EXIT***********");
0197 return NULL;
0198 }
0199
0200
0201 exprivate void slipSigHandler (int sig)
0202 {
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212
0213
0214 pthread_t thread;
0215 pthread_attr_t pthread_custom_attr;
0216
0217 pthread_attr_init(&pthread_custom_attr);
0218
0219 pthread_attr_setdetachstate(&pthread_custom_attr, PTHREAD_CREATE_DETACHED);
0220
0221 ndrx_platf_stack_set(&pthread_custom_attr);
0222
0223
0224
0225 pthread_create(&thread, &pthread_custom_attr, sigthread_enter, NULL);
0226 }
0227
0228
0229
0230
0231
0232
0233 expublic char * ndrx_epoll_mode(void)
0234 {
0235 #ifdef EX_USE_EMQ
0236 static char *mode = "emq";
0237 #else
0238 static char *mode = "poll";
0239 #endif
0240
0241 return mode;
0242 }
0243
0244
0245
0246
0247
0248 exprivate int signal_handle_event(void)
0249 {
0250 int ret = EXSUCCEED;
0251
0252 ndrx_epoll_set_t *s, *stmp;
0253 ndrx_epoll_mqds_t* m, *mtmp;
0254
0255
0256
0257 MUTEX_LOCK_V(M_psets_lock);
0258
0259 EXHASH_ITER(hh, M_psets, s, stmp)
0260 {
0261 EXHASH_ITER(hh, s->mqds, m, mtmp)
0262 {
0263 struct mq_attr att;
0264
0265
0266 if (EXSUCCEED!= ndrx_mq_getattr(m->mqd, &att))
0267 {
0268 NDRX_LOG(log_warn, "Failed to get attribs of Q: %d (%s)",
0269 m->mqd, strerror(errno));
0270 MUTEX_UNLOCK_V(M_psets_lock);
0271 }
0272
0273 if (att.mq_curmsgs > 0)
0274 {
0275 if (EXFAIL==write (s->wakeup_pipe[WRITE], (char *)&m->mqd,
0276 sizeof(m->mqd)))
0277 {
0278 NDRX_LOG(log_error, "Error ! write fail: %s", strerror(errno));
0279 }
0280 }
0281 }
0282
0283
0284 if (EXSUCCEED!=signal_install_notifications_all(s))
0285 {
0286 NDRX_LOG(log_warn, "Failed to install notifs for set: %d", s->fd);
0287 }
0288 }
0289
0290 MUTEX_UNLOCK_V(M_psets_lock);
0291
0292 out:
0293 return ret;
0294 }
0295
0296
0297
0298
0299
0300
0301 exprivate void * signal_process(void *arg)
0302 {
0303 sigset_t blockMask;
0304 char *fn = "signal_process";
0305 int ret = EXSUCCEED;
0306 int sig;
0307
0308 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
0309
0310
0311 pthread_cleanup_push(cleanup_handler, NULL);
0312
0313 NDRX_LOG(log_debug, "%s - enter", fn);
0314
0315
0316 sigemptyset(&blockMask);
0317 sigaddset(&blockMask, NOTIFY_SIG);
0318
0319 while (!M_shutdown)
0320 {
0321 NDRX_LOG(6, "%s - before sigwait()", fn);
0322
0323 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
0324
0325 ret=sigwait(&blockMask, &sig);
0326 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
0327
0328 if (EXSUCCEED!=ret)
0329 {
0330 NDRX_LOG(log_warn, "sigwait failed:(%s)", strerror(errno));
0331 }
0332
0333 NDRX_LOG(6, "%s - after sigwait()", fn);
0334
0335 signal_handle_event();
0336 }
0337
0338 pthread_cleanup_pop(0);
0339
0340 out:
0341 return NULL;
0342 }
0343
0344
0345
0346
0347 exprivate int signal_install_notifications_all(ndrx_epoll_set_t *s)
0348 {
0349 int ret = EXSUCCEED;
0350 ndrx_epoll_mqds_t* m, *mtmp;
0351
0352 EXHASH_ITER(hh, s->mqds, m, mtmp)
0353 {
0354 if (EXFAIL==ndrx_mq_notify(m->mqd, &(m->sev)))
0355 {
0356 int err = errno;
0357 if (EBUSY!=err)
0358 {
0359 NDRX_LOG(log_warn, "ndrx_mq_notify failed: %d (%s) - nothing to do",
0360 m->mqd, strerror(err));
0361 }
0362 }
0363 }
0364 out:
0365 return ret;
0366 }
0367
0368
0369
0370
0371
0372 expublic int ndrx_epoll_sys_init(void)
0373 {
0374 sigset_t blockMask;
0375 struct sigaction sa;
0376 pthread_attr_t pthread_custom_attr;
0377 pthread_attr_t pthread_custom_attr_dog;
0378 char *fn = "ndrx_epoll_sys_init";
0379
0380 NDRX_LOG(log_debug, "%s - enter", fn);
0381 if (!M_signal_first)
0382 {
0383 NDRX_LOG(log_warn, "Already init done for poll()");
0384 return EXSUCCEED;
0385 }
0386
0387 sa.sa_handler = slipSigHandler;
0388 sigemptyset (&sa.sa_mask);
0389 sa.sa_flags = SA_RESTART;
0390 sigaction (NOTIFY_SIG, &sa, 0);
0391
0392
0393
0394
0395 M_shutdown=EXFALSE;
0396 sigemptyset(&blockMask);
0397 sigaddset(&blockMask, NOTIFY_SIG);
0398
0399 if (pthread_sigmask(SIG_BLOCK, &blockMask, NULL) == -1)
0400 {
0401 NDRX_LOG(log_always, "%s: sigprocmask failed: %s", fn, strerror(errno));
0402 }
0403
0404
0405 pthread_attr_init(&pthread_custom_attr);
0406 pthread_attr_init(&pthread_custom_attr_dog);
0407
0408
0409 ndrx_platf_stack_set(&pthread_custom_attr);
0410 pthread_create(&M_signal_thread, &pthread_custom_attr,
0411 signal_process, NULL);
0412 M_signal_first = EXFALSE;
0413
0414
0415 return EXSUCCEED;
0416 }
0417
0418
0419
0420
0421
0422 expublic void ndrx_epoll_sys_uninit(void)
0423 {
0424 char *fn = "ndrx_epoll_sys_uninit";
0425
0426 NDRX_LOG(log_debug, "%s - enter", fn);
0427
0428
0429 NDRX_LOG(log_debug, "About to cancel signal thread");
0430
0431
0432
0433
0434 M_shutdown=EXTRUE;
0435 if (EXSUCCEED!=pthread_cancel(M_signal_thread))
0436 {
0437 NDRX_LOG(log_error, "Failed to kill poll signal thread: %s", strerror(errno));
0438 }
0439 else
0440 {
0441 void * res = EXSUCCEED;
0442 if (EXSUCCEED!=pthread_join(M_signal_thread, &res))
0443 {
0444 NDRX_LOG(log_error, "Failed to join pthread_join() signal thread: %s",
0445 strerror(errno));
0446 }
0447
0448 if (res == PTHREAD_CANCELED)
0449 {
0450 NDRX_LOG(log_info, "Signal thread canceled ok!");
0451 }
0452 else
0453 {
0454 NDRX_LOG(log_info, "Signal thread failed to cancel "
0455 "(should not happen!!)");
0456 }
0457 }
0458
0459 NDRX_LOG(log_debug, "finished ok");
0460 }
0461
0462
0463
0464
0465
0466
0467
0468 exprivate void ndrx_ndrx_mq_notify_func(union sigval sv)
0469 {
0470 mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
0471
0472 ndrx_epoll_set_t *s, *stmp;
0473 ndrx_epoll_mqds_t* mqd_h;
0474
0475 NDRX_LOG(log_debug, "ndrx_ndrx_mq_notify_func() called mqd %d", mqdes);
0476
0477 MUTEX_LOCK_V(M_psets_lock);
0478
0479 EXHASH_ITER(hh, M_psets, s, stmp)
0480 {
0481 if (NULL!=(mqd_h = mqd_find(s, mqdes)))
0482 {
0483 break;
0484 }
0485 }
0486
0487 if (NULL==mqd_h)
0488 {
0489 NDRX_LOG(log_error, "ndrx_epoll set not found for mqd %d", mqdes);
0490 goto out;
0491 }
0492
0493 NDRX_LOG(log_debug, "ndrx_epoll set found piping event...");
0494
0495 if (-1==write (s->wakeup_pipe[WRITE], (char *)&mqdes, sizeof(mqdes)))
0496 {
0497 NDRX_LOG(log_error, "Error ! write fail: %s", strerror(errno));
0498 }
0499
0500
0501
0502
0503 if (EXFAIL==ndrx_mq_notify(mqdes, &mqd_h->sev))
0504 {
0505 NDRX_LOG(log_error, "Failed to register notification for mqd %d (%s)!!!",
0506 mqdes, strerror(errno));
0507 }
0508
0509 out:
0510 MUTEX_UNLOCK_V(M_psets_lock);
0511 }
0512
0513
0514
0515
0516
0517 exprivate void ndrx_epoll_set_err(int error_code, const char *fmt, ...)
0518 {
0519 char msg[ERROR_BUFFER+1] = {EXEOS};
0520 va_list ap;
0521
0522 NSTD_TLS_ENTRY;
0523
0524 va_start(ap, fmt);
0525 (void) vsnprintf(msg, sizeof(msg), fmt, ap);
0526 va_end(ap);
0527
0528 NDRX_STRCPY_SAFE(G_nstd_tls->M_last_err_msg, msg);
0529 G_nstd_tls->M_last_err = error_code;
0530
0531 NDRX_LOG(log_warn, "ndrx_epoll_set_err: %d (%s) (%s)",
0532 error_code, strerror(G_nstd_tls->M_last_err),
0533 G_nstd_tls->M_last_err_msg);
0534
0535 }
0536
0537
0538
0539
0540 exprivate ndrx_epoll_fds_t* fd_find(ndrx_epoll_set_t *pset, int fd)
0541 {
0542 ndrx_epoll_fds_t*ret = NULL;
0543
0544 EXHASH_FIND_INT( pset->fds, &fd, ret);
0545
0546 return ret;
0547 }
0548
0549
0550
0551
0552 exprivate ndrx_epoll_mqds_t* mqd_find(ndrx_epoll_set_t *pset, mqd_t mqd)
0553 {
0554 ndrx_epoll_mqds_t*ret = NULL;
0555
0556 EXHASH_FIND_MQD( pset->mqds, &mqd, ret);
0557
0558 return ret;
0559 }
0560
0561
0562
0563
0564 exprivate ndrx_epoll_set_t* pset_find(int epfd)
0565 {
0566 ndrx_epoll_set_t *ret = NULL;
0567
0568 EXHASH_FIND_INT( M_psets, &epfd, ret);
0569
0570 return ret;
0571 }
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581 expublic int ndrx_epoll_ctl(int epfd, int op, int fd, struct ndrx_epoll_event *event)
0582 {
0583 int ret = EXSUCCEED;
0584 ndrx_epoll_set_t* set = NULL;
0585 ndrx_epoll_fds_t * tmp = NULL;
0586 char *fn = "ndrx_epoll_ctl";
0587 EX_EPOLL_API_ENTRY;
0588
0589 MUTEX_LOCK_V(M_psets_lock);
0590
0591 if (NULL==(set = pset_find(epfd)))
0592 {
0593 NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0594 ndrx_epoll_set_err(ENOSYS, "ndrx_epoll set %d not found", epfd);
0595 EXFAIL_OUT(ret);
0596 }
0597
0598 if (EX_EPOLL_CTL_ADD == op)
0599 {
0600 NDRX_LOG(log_info, "%s: Add operation on ndrx_epoll set %d, fd %d", fn, epfd, fd);
0601
0602
0603 if (NULL!=fd_find(set, fd))
0604 {
0605
0606 ndrx_epoll_set_err(EINVAL, "fd %d already exists in ndrx_epoll set (epfd %d)",
0607 fd, set->fd);
0608 NDRX_LOG(log_error, "fd %d already exists in ndrx_epoll set (epfd %d)",
0609 fd, set->fd);
0610 EXFAIL_OUT(ret);
0611 }
0612
0613 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(*tmp))))
0614 {
0615 ndrx_epoll_set_err(errno, "Failed to alloc FD hash entry");
0616 NDRX_LOG(log_error, "Failed to alloc FD hash entry");
0617 EXFAIL_OUT(ret);
0618 }
0619
0620 tmp->fd = fd;
0621 EXHASH_ADD_INT(set->fds, fd, tmp);
0622
0623
0624 set->nrfds++;
0625
0626 NDRX_LOG(log_info, "set nrfds incremented to %d", set->nrfds);
0627
0628 if (NULL==(set->fdtab=NDRX_REALLOC(set->fdtab, sizeof(struct pollfd)*set->nrfds)))
0629 {
0630 ndrx_epoll_set_err(errno, "Failed to realloc %d/%d",
0631 set->nrfds, sizeof(struct pollfd)*set->nrfds);
0632 NDRX_LOG(log_error, "Failed to realloc %d/%d",
0633 set->nrfds, sizeof(struct pollfd)*set->nrfds);
0634 EXFAIL_OUT(ret);
0635 }
0636
0637 set->fdtab[set->nrfds-1].fd = fd;
0638 set->fdtab[set->nrfds-1].events = event->events;
0639 }
0640 else if (EX_EPOLL_CTL_DEL == op)
0641 {
0642 int i;
0643 NDRX_LOG(log_info, "%s: Delete operation on ndrx_epoll set %d, fd %d",
0644 fn, epfd, fd);
0645
0646
0647 if (NULL==(tmp=fd_find(set, fd)))
0648 {
0649 ndrx_epoll_set_err(EINVAL, "fd %d not found in ndrx_epoll set (epfd %d)",
0650 fd, set->fd);
0651 NDRX_LOG(log_error, "fd %d not found in ndrx_epoll set (epfd %d)",
0652 fd, set->fd);
0653 EXFAIL_OUT(ret);
0654 }
0655
0656
0657
0658 EXHASH_DEL(set->fds, tmp);
0659 NDRX_FREE((char *)tmp);
0660
0661 for (i = 0; i < set->nrfds; i++)
0662 {
0663 if (set->fdtab[i].fd == fd)
0664 {
0665
0666 if (i!=set->nrfds-1 && set->nrfds>1)
0667 {
0668
0669
0670
0671
0672 memmove(&set->fdtab[i], &set->fdtab[i+1],
0673 sizeof(struct pollfd)*(set->nrfds-i-1));
0674 }
0675
0676 set->nrfds--;
0677
0678 NDRX_LOG(log_info, "set nrfds decremented to %d fdtab=%p",
0679 set->nrfds, set->fdtab);
0680
0681 if (0==set->nrfds)
0682 {
0683 NDRX_LOG(log_warn, "set->nrfds == 0, => free");
0684 NDRX_FREE((char *)set->fdtab);
0685 }
0686 else if (NULL==(set->fdtab=NDRX_REALLOC(set->fdtab,
0687 sizeof(struct pollfd)*set->nrfds)))
0688 {
0689 ndrx_epoll_set_err(errno, "Failed to realloc %d/%d",
0690 set->nrfds, sizeof(struct pollfd)*set->nrfds);
0691
0692 NDRX_LOG(log_error, "Failed to realloc %d/%d",
0693 set->nrfds, sizeof(struct pollfd)*set->nrfds);
0694
0695 EXFAIL_OUT(ret);
0696 }
0697 }
0698 }
0699 }
0700 else
0701 {
0702 ndrx_epoll_set_err(EINVAL, "Invalid operation %d", op);
0703 NDRX_LOG(log_error, "Invalid operation %d", op);
0704
0705 EXFAIL_OUT(ret);
0706 }
0707
0708 out:
0709
0710 MUTEX_UNLOCK_V(M_psets_lock);
0711
0712 NDRX_LOG(log_info, "%s return %d", fn, ret);
0713
0714 return ret;
0715 }
0716
0717
0718
0719
0720
0721
0722
0723
0724
0725 expublic int ndrx_epoll_ctl_mq(int epfd, int op, mqd_t mqd, struct ndrx_epoll_event *event)
0726 {
0727 int ret = EXSUCCEED;
0728 ndrx_epoll_set_t* set = NULL;
0729 ndrx_epoll_mqds_t * tmp = NULL;
0730 char *fn = "ndrx_epoll_ctl_mq";
0731
0732 EX_EPOLL_API_ENTRY;
0733
0734 MUTEX_LOCK_V(M_psets_lock);
0735
0736 if (NULL==(set = pset_find(epfd)))
0737 {
0738 ndrx_epoll_set_err(ENOSYS, "ndrx_epoll set %d not found", epfd);
0739 NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0740
0741 EXFAIL_OUT(ret);
0742 }
0743
0744 if (EX_EPOLL_CTL_ADD == op)
0745 {
0746 NDRX_LOG(log_info, "%s: Add operation on ndrx_epoll set %d, fd %d", fn, epfd, mqd);
0747
0748
0749 if (NULL!=mqd_find(set, mqd))
0750 {
0751 ndrx_epoll_set_err(EINVAL, "fd %d already exists in ndrx_epoll set (epfd %d)",
0752 mqd, set->fd);
0753 NDRX_LOG(log_error, "fd %d already exists in ndrx_epoll set (epfd %d)",
0754 mqd, set->fd);
0755 EXFAIL_OUT(ret);
0756 }
0757
0758 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(*tmp))))
0759 {
0760 ndrx_epoll_set_err(errno, "Failed to alloc FD hash entry");
0761 NDRX_LOG(log_error, "Failed to alloc FD hash entry");
0762
0763 EXFAIL_OUT(ret);
0764 }
0765
0766 tmp->mqd = mqd;
0767 EXHASH_ADD_MQD(set->mqds, mqd, tmp);
0768
0769
0770 set->nrfmqds++;
0771
0772 NDRX_LOG(log_info, "set nrfmqds incremented to %d", set->nrfmqds);
0773
0774
0775
0776
0777 #ifdef EX_POLL_SIGNALLED
0778 tmp->sev.sigev_notify = SIGEV_SIGNAL;
0779 tmp->sev.sigev_signo = NOTIFY_SIG;
0780 #else
0781
0782 tmp->sev.sigev_notify = SIGEV_THREAD;
0783 tmp->sev.sigev_notify_function = ndrx_ndrx_mq_notify_func;
0784 tmp->sev.sigev_notify_attributes = NULL;
0785 tmp->sev.sigev_value.sival_ptr = &tmp->mqd;
0786
0787 if (EXFAIL==ndrx_mq_notify(mqd, &tmp->sev))
0788 {
0789 ndrx_epoll_set_err(errno, "Failed to register notification for mqd %d", mqd);
0790 EXFAIL_OUT(ret);
0791 }
0792
0793 #endif
0794 }
0795 else if (EX_EPOLL_CTL_DEL == op)
0796 {
0797 NDRX_LOG(log_info, "%s: Delete operation on ndrx_epoll set %d, fd %d", fn, epfd, mqd);
0798
0799
0800 if (NULL==(tmp=mqd_find(set, mqd)))
0801 {
0802 ndrx_epoll_set_err(EINVAL, "fd %d not found in ndrx_epoll set (epfd %d)",
0803 mqd, set->fd);
0804
0805 NDRX_LOG(log_error, "fd %d not found in ndrx_epoll set (epfd %d)",
0806 mqd, set->fd);
0807
0808 EXFAIL_OUT(ret);
0809 }
0810
0811
0812 EXHASH_DEL(set->mqds, tmp);
0813 NDRX_FREE((char *)tmp);
0814
0815
0816 set->nrfmqds--;
0817
0818 NDRX_LOG(log_info, "set nrfmqds decrement to %d", set->nrfmqds);
0819 }
0820 else
0821 {
0822 ndrx_epoll_set_err(EINVAL, "Invalid operation %d", op);
0823
0824 NDRX_LOG(log_error, "Invalid operation %d", op);
0825
0826 EXFAIL_OUT(ret);
0827 }
0828
0829 out:
0830
0831 MUTEX_UNLOCK_V(M_psets_lock);
0832
0833 NDRX_LOG(log_info, "%s return %d", fn, ret);
0834
0835 return ret;
0836 }
0837
0838
0839
0840
0841
0842
0843 expublic int ndrx_epoll_create(int size)
0844 {
0845 int ret = EXSUCCEED;
0846 int i = 1;
0847 ndrx_epoll_set_t *set;
0848
0849 EX_EPOLL_API_ENTRY;
0850
0851 MUTEX_LOCK_V(M_psets_lock);
0852 while (NULL!=(set=pset_find(i)) && i < EX_POLL_SETS_MAX)
0853 {
0854 i++;
0855 }
0856 MUTEX_UNLOCK_V(M_psets_lock);
0857
0858
0859 if (NULL!=set)
0860 {
0861 ndrx_epoll_set_err(EMFILE, "Max ndrx_epoll_sets_reached");
0862 NDRX_LOG(log_error, "Max ndrx_epoll_sets_reached");
0863
0864
0865 set = NULL;
0866 EXFAIL_OUT(ret);
0867 }
0868
0869 NDRX_LOG(log_info, "Creating ndrx_epoll set: %d", i);
0870
0871 if (NULL==(set = (ndrx_epoll_set_t *)NDRX_CALLOC(1, sizeof(*set))))
0872 {
0873 ndrx_epoll_set_err(errno, "Failed to alloc: %d bytes", sizeof(*set));
0874
0875 NDRX_LOG(log_error, "Failed to alloc: %d bytes", sizeof(*set));
0876
0877 EXFAIL_OUT(ret);
0878 }
0879
0880
0881 if (pipe(set->wakeup_pipe) == -1)
0882 {
0883 ndrx_epoll_set_err(errno, "pipe failed");
0884 NDRX_LOG(log_error, "pipe failed");
0885 EXFAIL_OUT(ret);
0886 }
0887
0888 if (EXFAIL==fcntl(set->wakeup_pipe[READ], F_SETFL,
0889 fcntl(set->wakeup_pipe[READ], F_GETFL) | O_NONBLOCK))
0890 {
0891 ndrx_epoll_set_err(errno, "fcntl READ pipe set O_NONBLOCK failed");
0892 NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed");
0893 EXFAIL_OUT(ret);
0894 }
0895
0896 if (EXFAIL==fcntl(set->wakeup_pipe[WRITE], F_SETFL,
0897 fcntl(set->wakeup_pipe[WRITE], F_GETFL) | O_NONBLOCK))
0898 {
0899 ndrx_epoll_set_err(errno, "fcntl WRITE pipe set O_NONBLOCK failed");
0900 NDRX_LOG(log_error, "fcntl WRITE pipe set O_NONBLOCK failed");
0901 EXFAIL_OUT(ret);
0902 }
0903
0904 set->nrfds = 1;
0905 set->fd = i;
0906 if (NULL==(set->fdtab = NDRX_CALLOC(set->nrfds, sizeof(struct pollfd))))
0907 {
0908 ndrx_epoll_set_err(errno, "calloc for pollfd failed");
0909 NDRX_LOG(log_error, "calloc for pollfd failed");
0910 EXFAIL_OUT(ret);
0911 }
0912
0913
0914 set->fdtab[PIPE_POLL_IDX].fd = set->wakeup_pipe[READ];
0915 set->fdtab[PIPE_POLL_IDX].events = POLLIN;
0916
0917
0918 MUTEX_LOCK_V(M_psets_lock);
0919 EXHASH_ADD_INT(M_psets, fd, set);
0920 MUTEX_UNLOCK_V(M_psets_lock);
0921
0922 NDRX_LOG(log_info, "ndrx_epoll_create succeed, fd=%d", i);
0923
0924 out:
0925
0926 if (EXSUCCEED!=ret)
0927 {
0928 if (NULL!=set && set->wakeup_pipe[READ])
0929 {
0930 close(set->wakeup_pipe[READ]);
0931 }
0932
0933 if (NULL!=set && set->wakeup_pipe[WRITE])
0934 {
0935 close(set->wakeup_pipe[WRITE]);
0936 }
0937
0938 if (NULL!=set)
0939 {
0940 NDRX_FREE((char *)set);
0941 }
0942
0943 return EXFAIL;
0944
0945 }
0946
0947 return i;
0948 }
0949
0950
0951
0952
0953 expublic int ndrx_epoll_close(int epfd)
0954 {
0955 int ret = EXSUCCEED;
0956 ndrx_epoll_set_t* set = NULL;
0957
0958 ndrx_epoll_fds_t* f, *ftmp;
0959 ndrx_epoll_mqds_t* m, *mtmp;
0960
0961 NDRX_LOG(log_debug, "ndrx_epoll_close(%d) enter", epfd);
0962
0963 MUTEX_LOCK_V(M_psets_lock);
0964
0965 NDRX_LOG(log_debug, "ndrx_epoll_close(%d) enter (after lock", epfd);
0966
0967 if (NULL==(set = pset_find(epfd)))
0968 {
0969 MUTEX_UNLOCK_V(M_psets_lock);
0970
0971 ndrx_epoll_set_err(EINVAL, "ndrx_epoll set %d not found", epfd);
0972 NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
0973
0974
0975 EXFAIL_OUT(ret);
0976 }
0977 MUTEX_UNLOCK_V(M_psets_lock);
0978
0979 if (set->wakeup_pipe[READ])
0980 {
0981 close(set->wakeup_pipe[READ]);
0982 }
0983
0984 if (set->wakeup_pipe[WRITE])
0985 {
0986 close(set->wakeup_pipe[WRITE]);
0987 }
0988
0989
0990 EXHASH_ITER(hh, set->fds, f, ftmp)
0991 {
0992
0993 ndrx_epoll_ctl(set->fd, EX_EPOLL_CTL_DEL, f->fd, NULL);
0994 }
0995
0996
0997 EXHASH_ITER(hh, set->mqds, m, mtmp)
0998 {
0999 ndrx_epoll_ctl_mq(set->fd, EX_EPOLL_CTL_DEL, m->mqd, NULL);
1000 }
1001
1002 MUTEX_LOCK_V(M_psets_lock);
1003
1004 if (NULL!=set->fdtab)
1005 {
1006 NDRX_FREE(set->fdtab);
1007 }
1008
1009 EXHASH_DEL(M_psets, set);
1010 NDRX_FREE(set);
1011 MUTEX_UNLOCK_V(M_psets_lock);
1012
1013 out:
1014 return EXFAIL;
1015 }
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031 expublic int ndrx_epoll_wait(int epfd, struct ndrx_epoll_event *events,
1032 int maxevents, int timeout, char **buf, int *buf_len)
1033 {
1034 int ret = EXSUCCEED;
1035 int numevents = 0;
1036 ndrx_epoll_set_t* set = NULL;
1037 char *fn = "ndrx_epoll_wait";
1038 int i;
1039 int retpoll;
1040 int try;
1041 ndrx_epoll_mqds_t* m, *mtmp;
1042
1043 EX_EPOLL_API_ENTRY;
1044
1045
1046 *buf_len = EXFAIL;
1047
1048 MUTEX_LOCK_V(M_psets_lock);
1049
1050 if (NULL==(set = pset_find(epfd)))
1051 {
1052 MUTEX_UNLOCK_V(M_psets_lock);
1053
1054 ndrx_epoll_set_err(EINVAL, "ndrx_epoll set %d not found", epfd);
1055
1056 NDRX_LOG(log_error, "ndrx_epoll set %d not found", epfd);
1057
1058 EXFAIL_OUT(ret);
1059 }
1060
1061 MUTEX_UNLOCK_V(M_psets_lock);
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073 NDRX_LOG(log_debug, "Checking early messages...");
1074
1075 for (try = 0; try<2; try++)
1076 {
1077 MUTEX_LOCK_V(M_psets_lock);
1078 EXHASH_ITER(hh, set->mqds, m, mtmp)
1079 {
1080 struct mq_attr att;
1081
1082
1083 if (EXSUCCEED!= ndrx_mq_getattr(m->mqd, &att))
1084 {
1085 ndrx_epoll_set_err(errno, "Failed to get attribs of Q: %d", m->mqd);
1086 NDRX_LOG(log_warn, "Failed to get attribs of Q: %d", m->mqd);
1087
1088 MUTEX_UNLOCK_V(M_psets_lock);
1089 EXFAIL_OUT(ret);
1090 }
1091
1092 if (att.mq_curmsgs > 0)
1093 {
1094 if (numevents < maxevents )
1095 {
1096 numevents++;
1097
1098 NDRX_LOG(log_info, "Got mqdes %d for pipe", m->mqd);
1099
1100 events[numevents-1].data.mqd = m->mqd;
1101
1102 events[numevents-1].events = set->fdtab[PIPE_POLL_IDX].revents;
1103 events[numevents-1].is_mqd = EXTRUE;
1104 }
1105 else
1106 {
1107 break;
1108 }
1109 }
1110 }
1111 MUTEX_UNLOCK_V(M_psets_lock);
1112
1113 if (numevents)
1114 {
1115 NDRX_LOG(log_info, "Found %d events before polling "
1116 "(messages in Q) returning...", numevents);
1117 goto out;
1118 }
1119
1120 #ifdef EX_POLL_SIGNALLED
1121
1122 if (0==try)
1123 {
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138 MUTEX_LOCK_V(M_psets_lock);
1139
1140
1141
1142
1143
1144 if (EXSUCCEED!=signal_install_notifications_all(set))
1145 {
1146 ndrx_epoll_set_err(EXEOS, "Failed to install notifs!");
1147 NDRX_LOG(log_error, "Failed to install notifs!");
1148 MUTEX_UNLOCK_V(M_psets_lock);
1149 EXFAIL_OUT(ret);
1150 }
1151
1152 MUTEX_UNLOCK_V(M_psets_lock);
1153 }
1154 #endif
1155 }
1156
1157
1158
1159 for (i=0; i<set->nrfds; i++)
1160 {
1161 set->fdtab[i].revents = 0;
1162 NDRX_LOG(log_debug, "poll i=%d, fd=%d, events=%d",
1163 i, set->fdtab[i].fd, set->fdtab[i].events);
1164 }
1165
1166 NDRX_LOG(log_debug, "%s: epfd=%d, events=%p, maxevents=%d, timeout=%d - "
1167 "about to poll(nrfds=%d)",
1168 fn, epfd, events, maxevents, timeout, set->nrfds);
1169
1170
1171 retpoll = poll( set->fdtab, set->nrfds, timeout);
1172
1173 for (i=0; i<set->nrfds; i++)
1174 {
1175 if (set->fdtab[i].revents)
1176 {
1177 NDRX_LOG(log_debug, "%d revents=%d", i, set->fdtab[i].revents);
1178 if (PIPE_POLL_IDX==i)
1179 {
1180
1181 int err;
1182 mqd_t mqdes = 0;
1183 while (numevents < maxevents &&
1184 EXFAIL!=(ret=read(set->wakeup_pipe[READ], (char *)&mqdes,
1185 sizeof(mqdes))))
1186 {
1187 struct mq_attr att;
1188
1189
1190 if (EXSUCCEED!= ndrx_mq_getattr(mqdes, &att))
1191 {
1192
1193 NDRX_LOG(log_warn, "Failed to get attribs of Q: %d",
1194 mqdes);
1195
1196 }
1197 else if (att.mq_curmsgs > 0)
1198 {
1199 numevents++;
1200
1201 NDRX_LOG(log_info, "Got mqdes %d for pipe", mqdes);
1202
1203 events[numevents-1].data.mqd = mqdes;
1204 events[numevents-1].events = set->fdtab[i].revents;
1205 events[numevents-1].is_mqd = EXTRUE;
1206 }
1207
1208 }
1209
1210 err = errno;
1211
1212 if (EXFAIL==ret)
1213 {
1214 if (EAGAIN == err || EWOULDBLOCK == err)
1215 {
1216 ret = EXSUCCEED;
1217 }
1218 else
1219 {
1220 ndrx_epoll_set_err(err, "Failed to read notify unnamed pipe!");
1221 NDRX_LOG(log_error, "Failed to read notify unnamed pipe!");
1222 EXFAIL_OUT(ret);
1223 }
1224 }
1225 else if (ret!=sizeof(mqdes))
1226 {
1227 ndrx_epoll_set_err(EXEOS, "Error ! Expected piped read size %d but got %d!",
1228 ret, sizeof(mqdes));
1229 NDRX_LOG(log_error, "Error ! Expected piped read size %d but got %d!",
1230 ret, sizeof(mqdes));
1231 EXFAIL_OUT(ret);
1232 }
1233 else
1234 {
1235 ret = EXSUCCEED;
1236 }
1237
1238 }
1239 else if (numevents < maxevents)
1240 {
1241 numevents++;
1242
1243 NDRX_LOG(log_debug, "return normal fd");
1244
1245 events[numevents-1].data.fd =set->fdtab[i].fd;
1246 events[numevents-1].events = set->fdtab[i].revents;
1247 events[numevents-1].is_mqd = EXFALSE;
1248 }
1249 else
1250 {
1251 NDRX_LOG(log_warn, "Return struct full (reached %d)", maxevents);
1252 }
1253 }
1254 }
1255
1256 out:
1257
1258 NDRX_LOG(log_info, "%s ret=%d numevents=%d", fn, ret, numevents);
1259
1260 if (EXSUCCEED==ret)
1261 {
1262 return numevents;
1263 }
1264 else
1265 {
1266 return EXFAIL;
1267 }
1268 }
1269
1270
1271
1272
1273
1274 expublic int ndrx_epoll_errno(void)
1275 {
1276 NSTD_TLS_ENTRY;
1277 return G_nstd_tls->M_last_err;
1278 }
1279
1280
1281
1282
1283
1284
1285 expublic char * ndrx_poll_strerror(int err)
1286 {
1287 NSTD_TLS_ENTRY;
1288
1289 sprintf(G_nstd_tls->poll_strerr, "%s (last error: %s)",
1290 strerror(err), G_nstd_tls->M_last_err_msg);
1291
1292 return G_nstd_tls->poll_strerr;
1293 }
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303 expublic int ndrx_epoll_service_translate(char *send_q, char *q_prefix,
1304 char *svc, int resid)
1305 {
1306
1307 sprintf(send_q, NDRX_SVC_QFMT_SRVID, q_prefix,
1308 svc, resid);
1309
1310 return EXSUCCEED;
1311 }
1312
1313
1314
1315
1316
1317
1318
1319
1320 expublic mqd_t ndrx_epoll_service_add(char *svcnm, int idx, mqd_t mq_exits)
1321 {
1322 return mq_exits;
1323 }
1324
1325
1326
1327
1328
1329 expublic int ndrx_epoll_shmdetach(void)
1330 {
1331 return EXSUCCEED;
1332 }
1333
1334
1335
1336
1337
1338
1339 expublic int ndrx_epoll_shallopenq(int idx)
1340 {
1341 return EXTRUE;
1342 }
1343
1344
1345
1346
1347 expublic void ndrx_epoll_mainq_set(char *qstr)
1348 {
1349 return;
1350 }
1351
1352
1353
1354
1355
1356
1357 expublic int ndrx_epoll_down(int force)
1358 {
1359 return EXSUCCEED;
1360 }
1361
1362
1363