0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <time.h>
0039
0040 #include <unistd.h>
0041 #include <stdarg.h>
0042 #include <ctype.h>
0043 #include <memory.h>
0044 #include <errno.h>
0045 #include <signal.h>
0046 #include <limits.h>
0047 #include <pthread.h>
0048 #include <string.h>
0049 #include <fcntl.h>
0050 #include <sys/time.h>
0051 #include <sys/msg.h>
0052 #include <pthread.h>
0053 #include <ndrxdiag.h>
0054 #include <ndrstandard.h>
0055 #include <ndebug.h>
0056 #include <nstdutil.h>
0057 #include <limits.h>
0058
0059 #include <sys_unix.h>
0060
0061 #include <utlist.h>
0062 #include <sys_svq.h>
0063
0064
0065
0066
0067 #define READ 0
0068
0069 #define WRITE 1
0070 #define PIPE_POLL_IDX 0
0071
0072 #define DFLT_TOUT 60
0073
0074
0075
0076
0077 typedef struct ndrx_svq_mqd_hash ndrx_svq_mqd_hash_t;
0078
0079
0080
0081 struct ndrx_svq_mqd_hash
0082 {
0083 void *mqd;
0084
0085 ndrx_stopwatch_t stamp_time;
0086 unsigned long stamp_seq;
0087
0088 struct timespec abs_timeout;
0089
0090 EX_hash_handle hh;
0091 };
0092
0093 typedef struct ndrx_svq_fd_hash ndrx_svq_fd_hash_t;
0094
0095
0096
0097 struct ndrx_svq_fd_hash
0098 {
0099 int fd;
0100 mqd_t mqd;
0101
0102 EX_hash_handle hh;
0103 };
0104
0105
0106
0107
0108 typedef struct
0109 {
0110 int nrfds;
0111 struct pollfd *fdtab;
0112 struct pollfd *fdtabmo;
0113
0114 int evpipe[2];
0115 pthread_t evthread;
0116
0117
0118
0119
0120
0121 ndrx_svq_mqd_hash_t *mqdhash;
0122
0123 ndrx_svq_fd_hash_t *fdhash;
0124
0125 } ndrx_svq_evmon_t;
0126
0127
0128
0129
0130
0131 exprivate ndrx_svq_evmon_t M_mon = {.evpipe[0]=EXFAIL,
0132 .evpipe[1]=EXFAIL,
0133 .fdtab = NULL,
0134 .fdtabmo = NULL};
0135 exprivate int M_shutdown = EXFALSE;
0136 exprivate int volatile M_alive = EXFALSE;
0137 exprivate int volatile __thread M_signalled = EXFALSE;
0138
0139 exprivate MUTEX_LOCKDECL(M_mon_lock_mq);
0140 exprivate MUTEX_LOCKDECL(M_mon_lock_fd);
0141
0142 exprivate int M_scanunit = CONF_NDRX_SCANUNIT_DFLT;
0143 exprivate int M_scanunit_was_init = EXFALSE;
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161
0162
0163
0164
0165 exprivate int ndrx_svq_fd_hash_addpoll(int fd, uint32_t events)
0166 {
0167 int ret = EXSUCCEED;
0168
0169
0170 M_mon.nrfds++;
0171
0172 NDRX_LOG(log_info, "set nrfds incremented to %d, events: %d",
0173 M_mon.nrfds, (int)events);
0174
0175 if (NULL==(M_mon.fdtab=NDRX_REALLOC(M_mon.fdtab, sizeof(struct pollfd)*M_mon.nrfds)))
0176 {
0177 NDRX_LOG(log_error, "Failed to realloc %d/%d",
0178 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds);
0179 EXFAIL_OUT(ret);
0180 }
0181
0182 if (NULL==(M_mon.fdtabmo=NDRX_REALLOC(M_mon.fdtabmo, sizeof(struct pollfd)*M_mon.nrfds)))
0183 {
0184 NDRX_LOG(log_error, "Failed to realloc (mo) %d/%d",
0185 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds);
0186 EXFAIL_OUT(ret);
0187 }
0188
0189 M_mon.fdtab[M_mon.nrfds-1].fd = fd;
0190 M_mon.fdtab[M_mon.nrfds-1].events = events;
0191 M_mon.fdtab[M_mon.nrfds-1].revents = 0;
0192
0193 out:
0194 return ret;
0195 }
0196
0197
0198
0199
0200
0201
0202 exprivate int ndrx_svq_fd_hash_delpoll(int fd)
0203 {
0204 int ret = EXSUCCEED;
0205 int i;
0206
0207 for (i = 0; i < M_mon.nrfds; i++)
0208 {
0209 if (M_mon.fdtab[i].fd == fd)
0210 {
0211
0212 if (i!=M_mon.nrfds-1 && M_mon.nrfds>1)
0213 {
0214 memmove(&M_mon.fdtab[i], &M_mon.fdtab[i+1],
0215 sizeof(struct pollfd)*(M_mon.nrfds-i-1));
0216 }
0217
0218 M_mon.nrfds--;
0219
0220 NDRX_LOG(log_info, "set nrfds decremented to %d fdtab=%p",
0221 M_mon.nrfds, M_mon.fdtab);
0222
0223 if (0==M_mon.nrfds)
0224 {
0225 NDRX_LOG(log_warn, "set->nrfds == 0, => free");
0226 NDRX_FREE((char *)M_mon.fdtab);
0227 NDRX_FREE((char *)M_mon.fdtabmo);
0228 }
0229 else if (NULL==(M_mon.fdtab=NDRX_REALLOC(M_mon.fdtab,
0230 sizeof(struct pollfd)*M_mon.nrfds)))
0231 {
0232 int err = errno;
0233 userlog("Failed to realloc %d/%d: %s",
0234 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds,
0235 strerror(err));
0236
0237 NDRX_LOG(log_error, "Failed to realloc %d/%d: %s",
0238 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds,
0239 strerror(err));
0240
0241 EXFAIL_OUT(ret);
0242 }
0243 else if (NULL==(M_mon.fdtabmo=NDRX_REALLOC(M_mon.fdtabmo,
0244 sizeof(struct pollfd)*M_mon.nrfds)))
0245 {
0246 int err = errno;
0247 userlog("Failed to realloc mo %d/%d: %s",
0248 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds,
0249 strerror(err));
0250
0251 NDRX_LOG(log_error, "Failed to realloc mo %d/%d: %s",
0252 M_mon.nrfds, sizeof(struct pollfd)*M_mon.nrfds,
0253 strerror(err));
0254
0255 EXFAIL_OUT(ret);
0256 }
0257 break;
0258 }
0259 }
0260
0261 out:
0262
0263 return ret;
0264 }
0265
0266
0267
0268
0269
0270
0271 exprivate ndrx_svq_fd_hash_t * ndrx_svq_fd_hash_find(int fd)
0272 {
0273 ndrx_svq_fd_hash_t *ret = NULL;
0274
0275 EXHASH_FIND_INT( (M_mon.fdhash), &fd, ret);
0276
0277 NDRX_LOG(log_dump, "checking fd %d -> %p", fd, ret);
0278
0279 return ret;
0280 }
0281
0282
0283
0284
0285
0286
0287
0288
0289 exprivate int ndrx_svq_fd_hash_add(int fd, mqd_t mqd, uint32_t events)
0290 {
0291 int ret = EXSUCCEED;
0292 ndrx_svq_fd_hash_t * el;
0293
0294 MUTEX_LOCK_V(M_mon_lock_fd);
0295
0296 el = ndrx_svq_fd_hash_find(fd);
0297
0298 if (NULL==el)
0299 {
0300 el = NDRX_FPMALLOC(sizeof(ndrx_svq_fd_hash_t), 0);
0301
0302 NDRX_LOG(log_dump, "Registering %d as int", fd);
0303
0304 if (NULL==el)
0305 {
0306 int err = errno;
0307
0308 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0309 userlog("Failed to alloc: %s", strerror(err));
0310
0311 EXFAIL_OUT(ret);
0312 }
0313
0314 el->fd = fd;
0315 el->mqd = mqd;
0316
0317 ndrx_svq_fd_hash_addpoll(fd, events);
0318 EXHASH_ADD_INT((M_mon.fdhash), fd, el);
0319 }
0320 else
0321 {
0322
0323 el->mqd = mqd;
0324 }
0325
0326 out:
0327 MUTEX_UNLOCK_V(M_mon_lock_fd);
0328 return ret;
0329 }
0330
0331
0332
0333
0334
0335 expublic int ndrx_svq_fd_nrof(void)
0336 {
0337 return M_mon.nrfds;
0338 }
0339
0340
0341
0342
0343
0344 exprivate int ndrx_svq_fd_hash_del(int fd)
0345 {
0346 int ret = EXSUCCEED;
0347 ndrx_svq_fd_hash_t *el;
0348
0349 MUTEX_LOCK_V(M_mon_lock_fd);
0350
0351
0352 if (EXSUCCEED!=ndrx_svq_fd_hash_delpoll(fd))
0353 {
0354 EXFAIL_OUT(ret);
0355 }
0356
0357
0358 EXHASH_FIND_INT( (M_mon.fdhash), &fd, el);
0359
0360 if (NULL!=el)
0361 {
0362 EXHASH_DEL((M_mon.fdhash), el);
0363 NDRX_FPFREE(el);
0364 }
0365
0366 out:
0367 MUTEX_UNLOCK_V(M_mon_lock_fd);
0368 return ret;
0369 }
0370
0371
0372
0373
0374
0375
0376 exprivate int ndrx_svq_fd_hash_delbymqd(mqd_t mqd)
0377 {
0378 int ret = EXSUCCEED;
0379 ndrx_svq_fd_hash_t *e=NULL, *et=NULL;
0380
0381 MUTEX_LOCK_V(M_mon_lock_fd);
0382
0383 EXHASH_ITER(hh, M_mon.fdhash, e, et)
0384 {
0385 if (e->mqd == mqd)
0386 {
0387
0388 if (EXSUCCEED!=ndrx_svq_fd_hash_delpoll(e->fd))
0389 {
0390 EXFAIL_OUT(ret);
0391 }
0392
0393
0394 EXHASH_DEL(M_mon.fdhash, e);
0395 NDRX_FPFREE(e);
0396 break;
0397 }
0398 }
0399
0400 out:
0401 MUTEX_UNLOCK_V(M_mon_lock_fd);
0402 return ret;
0403 }
0404
0405
0406
0407
0408
0409
0410 exprivate ndrx_svq_mqd_hash_t * ndrx_svq_mqd_hash_find(mqd_t mqd)
0411 {
0412 ndrx_svq_mqd_hash_t *ret = NULL;
0413
0414 EXHASH_FIND_PTR( (M_mon.mqdhash), ((void **)&mqd), ret);
0415
0416 NDRX_LOG(log_dump, "checking mqd %p -> %p", mqd, ret);
0417
0418 return ret;
0419 }
0420
0421
0422
0423
0424
0425
0426
0427
0428 expublic int ndrx_svq_mqd_hash_add(mqd_t mqd, ndrx_stopwatch_t *stamp_time,
0429 unsigned long stamp_seq, struct timespec *abs_timeout)
0430 {
0431 int ret = EXSUCCEED;
0432 ndrx_svq_mqd_hash_t * el;
0433
0434 MUTEX_LOCK_V(M_mon_lock_mq);
0435 el = ndrx_svq_mqd_hash_find(mqd);
0436
0437 if (NULL==el)
0438 {
0439 el = NDRX_FPMALLOC(sizeof(ndrx_svq_mqd_hash_t), 0);
0440
0441 NDRX_LOG(log_dump, "Registering %p as mqd_t", mqd);
0442
0443 if (NULL==el)
0444 {
0445 int err = errno;
0446
0447 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0448 userlog("Failed to alloc: %s", strerror(err));
0449
0450 EXFAIL_OUT(ret);
0451 }
0452
0453 el->mqd = (void *)mqd;
0454 el->stamp_seq = stamp_seq;
0455 el->stamp_time = *stamp_time;
0456 el->abs_timeout = *abs_timeout;
0457 EXHASH_ADD_PTR((M_mon.mqdhash), mqd, el);
0458 }
0459 else
0460 {
0461
0462 el->stamp_seq = stamp_seq;
0463 el->stamp_time = *stamp_time;
0464 el->abs_timeout = *abs_timeout;
0465 }
0466
0467 out:
0468
0469 MUTEX_UNLOCK_V(M_mon_lock_mq);
0470
0471 return ret;
0472 }
0473
0474
0475
0476
0477
0478 expublic void ndrx_svq_mqd_hash_del(mqd_t mqd)
0479 {
0480 ndrx_svq_mqd_hash_t *ret = NULL;
0481
0482
0483
0484 NDRX_LOG(log_debug, "Closing SV descr queue %p qstr:[%s] qid:%d",
0485 mqd, mqd->qstr, mqd->qid);
0486
0487
0488
0489
0490
0491
0492
0493
0494
0495
0496
0497
0498
0499 MUTEX_LOCK_V(M_mon_lock_mq);
0500
0501 EXHASH_FIND_PTR( (M_mon.mqdhash), ((void **)&mqd), ret);
0502
0503 if (NULL!=ret)
0504 {
0505 EXHASH_DEL((M_mon.mqdhash), ret);
0506 NDRX_FPFREE(ret);
0507 }
0508 MUTEX_UNLOCK_V(M_mon_lock_mq);
0509
0510 }
0511
0512
0513
0514
0515
0516 expublic int ndrx_svq_mqd_close(mqd_t mqd)
0517 {
0518 int ret = EXSUCCEED;
0519 ndrx_svq_ev_t *elt, *tmp;
0520
0521
0522 if (EXSUCCEED!=ndrx_svq_fd_hash_delbymqd(mqd))
0523 {
0524 EXFAIL_OUT(ret);
0525 }
0526
0527 ndrx_svq_mqd_hash_del(mqd);
0528
0529
0530 MUTEX_LOCK_V(mqd->qlock);
0531
0532 DL_FOREACH_SAFE(mqd->eventq,elt,tmp)
0533 {
0534 DL_DELETE(mqd->eventq, elt);
0535 NDRX_FPFREE(elt);
0536 }
0537 MUTEX_UNLOCK_V(mqd->qlock);
0538
0539 NDRX_SPIN_DESTROY_V(mqd->rcvlock);
0540 NDRX_SPIN_DESTROY_V(mqd->rcvlockb4);
0541
0542
0543
0544
0545
0546
0547 out:
0548 return ret;
0549 }
0550
0551
0552
0553
0554
0555 exprivate int ndrx_svq_mqd_hash_dispatch(void)
0556 {
0557 int ret = EXSUCCEED;
0558 ndrx_svq_mqd_hash_t * r, *rt;
0559 long delta;
0560 struct timespec abs_timeout;
0561 struct timeval timeval;
0562 ndrx_svq_ev_t * ev = NULL;
0563 int err;
0564
0565
0566 MUTEX_LOCK_V(M_mon_lock_mq);
0567
0568 EXHASH_ITER(hh, (M_mon.mqdhash), r, rt)
0569 {
0570
0571 gettimeofday (&timeval, NULL);
0572 abs_timeout.tv_sec = timeval.tv_sec;
0573 abs_timeout.tv_nsec = timeval.tv_usec*1000;
0574
0575 delta = ndrx_timespec_get_delta(&(r->abs_timeout), &abs_timeout);
0576
0577 if (delta <= 0)
0578 {
0579 int wait_matched;
0580 NDRX_SPIN_LOCK_V(( ((mqd_t)r->mqd)->stamplock));
0581
0582 if (NDRX_SVQ_TOUT_MATCH((r), ((mqd_t)r->mqd)))
0583 {
0584 wait_matched = EXTRUE;
0585 }
0586 else
0587 {
0588 wait_matched = EXFALSE;
0589 }
0590 NDRX_SPIN_UNLOCK_V((((mqd_t)r->mqd)->stamplock));
0591
0592 NDRX_LOG(log_debug, "Timeout condition: mqd %p time spent: %ld "
0593 "matched: %d seq: %lu",
0594 r->mqd, delta, wait_matched, r->stamp_seq);
0595
0596
0597
0598
0599
0600
0601
0602 if (wait_matched)
0603 {
0604 if (NULL==(ev = NDRX_FPMALLOC(sizeof(ndrx_svq_ev_t), 0)))
0605 {
0606 err = errno;
0607 NDRX_LOG(log_error, "Failed to allocate ndrx_svq_ev_t: %s",
0608 strerror(err));
0609 userlog("Failed to allocate ndrx_svq_ev_t: %s",
0610 strerror(err));
0611 EXFAIL_OUT(ret);
0612 }
0613
0614 ev->ev = NDRX_SVQ_EV_TOUT;
0615 ev->data = NULL;
0616 ev->stamp_seq = r->stamp_seq;
0617 ev->stamp_time = r->stamp_time;
0618 ev->prev = NULL;
0619 ev->next = NULL;
0620 ret=ndrx_svq_mqd_put_event(r->mqd, &ev);
0621
0622 if (NULL!=ev)
0623 {
0624 NDRX_FPFREE(ev);
0625 }
0626
0627 if (EXSUCCEED!=ret)
0628 {
0629 NDRX_LOG(log_error, "Failed to put event for %p typ %d",
0630 r->mqd, NDRX_SVQ_EV_TOUT);
0631 EXFAIL_OUT(ret);
0632 }
0633 }
0634 else
0635 {
0636 NDRX_LOG(log_debug, "Wait not matched -> do not put event...");
0637 }
0638
0639
0640 EXHASH_DEL((M_mon.mqdhash), r);
0641 NDRX_FPFREE(r);
0642
0643 }
0644 }
0645 out:
0646
0647 MUTEX_UNLOCK_V(M_mon_lock_mq);
0648
0649 return ret;
0650 }
0651
0652
0653
0654
0655
0656
0657
0658
0659
0660
0661
0662 expublic int ndrx_svq_mqd_put_event(mqd_t mqd, ndrx_svq_ev_t **ev)
0663 {
0664 int ret = EXSUCCEED;
0665 int l2, l1;
0666
0667
0668
0669
0670
0671 MUTEX_LOCK_V(mqd->barrier);
0672
0673
0674 MUTEX_LOCK_V(mqd->qlock);
0675 DL_APPEND(mqd->eventq, (*ev));
0676 MUTEX_UNLOCK_V(mqd->qlock);
0677 *ev=NULL;
0678
0679 l1=NDRX_SPIN_TRYLOCK_V(mqd->rcvlockb4);
0680 l2=NDRX_SPIN_TRYLOCK_V(mqd->rcvlock);
0681
0682 if (0==l1)
0683 {
0684
0685
0686
0687
0688
0689 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0690
0691 if (0==l2)
0692 {
0693 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0694 }
0695 }
0696 else
0697 {
0698
0699
0700
0701
0702
0703
0704
0705
0706
0707
0708
0709
0710 if (0==l2)
0711 {
0712 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0713 }
0714
0715
0716
0717
0718
0719 MUTEX_LOCK_V(mqd->qlock);
0720 MUTEX_UNLOCK_V(mqd->qlock);
0721
0722
0723
0724
0725 while (1)
0726 {
0727
0728 l1=NDRX_SPIN_TRYLOCK_V(mqd->rcvlockb4);
0729 l2=NDRX_SPIN_TRYLOCK_V(mqd->rcvlock);
0730
0731
0732
0733 if (0==l1 && 0==l2)
0734 {
0735
0736
0737 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0738 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0739
0740 break;
0741 }
0742 else if (0!=l1 && 0!=l2)
0743 {
0744
0745
0746
0747
0748 if (0!=pthread_kill(mqd->thread, NDRX_SVQ_SIG))
0749 {
0750 int err = errno;
0751 NDRX_LOG(log_error, "pthread_kill(%d) failed: %s",
0752 NDRX_SVQ_SIG, strerror(err));
0753 userlog("pthread_kill(%d) failed: %s",
0754 NDRX_SVQ_SIG, strerror(err));
0755 EXFAIL_OUT(ret);
0756 }
0757 break;
0758
0759
0760
0761
0762 }
0763
0764 if (0==l1)
0765 {
0766 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
0767 }
0768
0769 if (0==l2)
0770 {
0771 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
0772 }
0773
0774
0775 sched_yield();
0776
0777 }
0778 }
0779
0780 out:
0781 MUTEX_UNLOCK_V(mqd->barrier);
0782
0783 return ret;
0784 }
0785
0786
0787
0788
0789
0790
0791
0792
0793
0794 exprivate void ndrx_svq_signal_action(int sig)
0795 {
0796
0797
0798
0799
0800 M_signalled = sig;
0801 return;
0802 }
0803
0804
0805
0806
0807
0808
0809
0810
0811 exprivate void * ndrx_svq_timeout_thread(void* arg)
0812 {
0813 int retpoll;
0814 int ret = EXSUCCEED;
0815 int i, moc, donext;
0816 int err;
0817 ndrx_svq_mon_cmd_t cmd;
0818 ndrx_svq_ev_t *ev;
0819 sigset_t set;
0820 struct pollfd *fdtabmo_tmp = NULL;
0821
0822
0823 _Nunset_error();
0824
0825
0826
0827 if (EXSUCCEED!=sigfillset(&set))
0828 {
0829 err = errno;
0830 NDRX_LOG(log_error, "Failed to fill signal array: %s", strerror(err));
0831 userlog("Failed to fill signal array: %s", strerror(err));
0832 EXFAIL_OUT(ret);
0833 }
0834
0835 if (EXSUCCEED!=sigdelset(&set, NDRX_SVQ_SIG))
0836 {
0837 err = errno;
0838 NDRX_LOG(log_error, "Failed to delete signal %d: %s",
0839 NDRX_SVQ_SIG, strerror(err));
0840 userlog("Failed to delete signal %d: %s",
0841 NDRX_SVQ_SIG, strerror(err));
0842 EXFAIL_OUT(ret);
0843 }
0844
0845 if (EXSUCCEED!=pthread_sigmask(SIG_BLOCK, &set, NULL))
0846 {
0847 err = errno;
0848 NDRX_LOG(log_error, "Failed to block all signals but %d for even thread: %s",
0849 NDRX_SVQ_SIG, strerror(err));
0850 userlog("Failed to block all signals but %d for even thread: %s",
0851 NDRX_SVQ_SIG, strerror(err));
0852 EXFAIL_OUT(ret);
0853 }
0854
0855
0856
0857
0858
0859
0860
0861
0862
0863
0864
0865
0866
0867 int syncfd = EXFALSE;
0868
0869
0870
0871
0872
0873
0874
0875
0876
0877
0878 while (!M_shutdown)
0879 {
0880
0881 NDRX_LOG(6, "About to poll for: %d ms nrfds=%d",
0882 M_scanunit, M_mon.nrfds);
0883
0884 moc=0;
0885 donext=EXTRUE;
0886
0887
0888 MUTEX_LOCK_V(M_mon_lock_fd);
0889
0890 for (i=0; i<M_mon.nrfds; i++)
0891 {
0892
0893
0894
0895
0896 if (syncfd || i<1)
0897 {
0898 M_mon.fdtabmo[moc].revents = 0;
0899 M_mon.fdtabmo[moc].fd = M_mon.fdtab[i].fd;
0900 M_mon.fdtabmo[moc].events = M_mon.fdtab[i].events;
0901
0902 NDRX_LOG(6, "moc=%d %p %p fd=%d", moc,
0903 M_mon.fdtabmo, M_mon.fdtab, M_mon.fdtabmo[moc].fd);
0904 moc++;
0905 }
0906 }
0907
0908
0909 if (NULL!=fdtabmo_tmp)
0910 {
0911 NDRX_FPFREE(fdtabmo_tmp);
0912 }
0913
0914 fdtabmo_tmp = NDRX_FPMALLOC(sizeof(struct pollfd)*moc, 0);
0915
0916 if (NULL==fdtabmo_tmp)
0917 {
0918 NDRX_LOG(log_error, "Failed to malloc %d: %s",
0919 sizeof(struct pollfd)*moc, strerror(errno));
0920 userlog("Failed to malloc %d: %s",
0921 sizeof(struct pollfd)*moc, strerror(errno));
0922
0923 MUTEX_UNLOCK_V(M_mon_lock_fd);
0924 EXFAIL_OUT(ret);
0925 }
0926
0927
0928 memcpy(fdtabmo_tmp, M_mon.fdtabmo, sizeof(struct pollfd)*moc);
0929
0930 MUTEX_UNLOCK_V(M_mon_lock_fd);
0931
0932 retpoll = poll( fdtabmo_tmp, moc, M_scanunit);
0933
0934 NDRX_LOG(6, "poll() ret = %d", retpoll);
0935 if (EXFAIL==retpoll)
0936 {
0937 err = errno;
0938
0939 if (err==EINTR)
0940 {
0941 NDRX_LOG(log_warn, "poll() interrupted... ignore...: %s",
0942 strerror(err));
0943 }
0944 else
0945 {
0946 NDRX_LOG(log_error, "System V auxiliary event poll() got error: %s",
0947 strerror(err));
0948 userlog("System V auxiliary event poll() got error: %s",
0949 strerror(err));
0950 EXFAIL_OUT(ret);
0951 }
0952 }
0953 else if (0==retpoll)
0954 {
0955
0956
0957
0958 NDRX_LOG(6, "Got timeout...");
0959
0960
0961
0962
0963 if (EXSUCCEED!=ndrx_svq_mqd_hash_dispatch())
0964 {
0965 NDRX_LOG(log_error, "Failed to dispatch events - general failure "
0966 "- reboot to avoid lockups!");
0967 userlog("Failed to dispatch events - general failure "
0968 "- reboot to avoid lockups!");
0969
0970
0971 exit(EXFAIL);
0972 }
0973
0974 }
0975 else for (i=0; i<moc && donext; i++)
0976 {
0977 if (fdtabmo_tmp[i].revents)
0978 {
0979 NDRX_LOG(log_debug, "%d fd=%d revents=%d", i,
0980 fdtabmo_tmp[i].fd, (int)fdtabmo_tmp[i].revents);
0981 if (PIPE_POLL_IDX==i)
0982 {
0983
0984
0985
0986
0987
0988 if (EXFAIL==read(M_mon.evpipe[READ], &cmd, sizeof(cmd)))
0989 {
0990 err = errno;
0991
0992 NDRX_LOG(log_error, "Failed to receive command block by "
0993 "System V monitoring thread: %s", strerror(err));
0994 userlog("Failed to receive command block by "
0995 "System V monitoring thread: %s", strerror(err));
0996 EXFAIL_OUT(ret);
0997 }
0998
0999 NDRX_LOG(log_debug, "Got command: %d %p flags=%d",
1000 cmd.cmd, cmd.mqd, cmd.flags);
1001
1002
1003 if (NDRX_SVQ_MONF_SYNCFD & cmd.flags)
1004 {
1005 syncfd = EXTRUE;
1006 }
1007
1008 switch (cmd.cmd)
1009 {
1010 case NDRX_SVQ_MON_TOUT:
1011
1012 if (cmd.abs_timeout.tv_sec > 0 ||
1013 cmd.abs_timeout.tv_nsec > 0)
1014 {
1015 NDRX_LOG(log_debug, "register timeout %p "
1016 "stamp_seq=%ld sec %ld nsec %ld",
1017 cmd.mqd, cmd.stamp_seq,
1018 (long)cmd.abs_timeout.tv_sec,
1019 (long)cmd.abs_timeout.tv_nsec);
1020
1021 if (EXSUCCEED!=ndrx_svq_mqd_hash_add(cmd.mqd,
1022 &(cmd.stamp_time), cmd.stamp_seq, &(cmd.abs_timeout)))
1023 {
1024 NDRX_LOG(log_error, "Failed to register timeout for %p!",
1025 cmd.mqd);
1026 userlog("Failed to register timeout for %p!",
1027 cmd.mqd);
1028 EXFAIL_OUT(ret);
1029 }
1030 }
1031
1032 break;
1033 case NDRX_SVQ_MON_ADDFD:
1034
1035 NDRX_LOG(log_info, "register fd %d for %p events %ld",
1036 cmd.fd, cmd.mqd, (long)cmd.events);
1037 if (EXSUCCEED!=ndrx_svq_fd_hash_add(cmd.fd, cmd.mqd,
1038 cmd.events))
1039 {
1040 NDRX_LOG(log_error, "Failed to register timeout for %p!",
1041 cmd.mqd);
1042 userlog("Failed to register timeout for %p!",
1043 cmd.mqd);
1044 EXFAIL_OUT(ret);
1045 }
1046
1047 break;
1048
1049 case NDRX_SVQ_MON_RMFD:
1050 NDRX_LOG(log_info, "Deregister fd %d from polling",
1051 cmd.fd);
1052 ndrx_svq_fd_hash_del(cmd.fd);
1053
1054 donext=EXFALSE;
1055 break;
1056 case NDRX_SVQ_MON_TERM:
1057 NDRX_LOG(log_info, "Terminate request...");
1058 goto out;
1059 break;
1060 }
1061
1062 }
1063 else
1064 {
1065 ndrx_svq_fd_hash_t *fdh = ndrx_svq_fd_hash_find(fdtabmo_tmp[i].fd);
1066
1067 if (NULL==fdh)
1068 {
1069 NDRX_LOG(log_error, "File descriptor %d not registered"
1070 " int System V poller - FAIL", fdtabmo_tmp[i].fd);
1071 userlog("File descriptor %d not registered"
1072 " int System V poller - FAIL", fdtabmo_tmp[i].fd);
1073 EXFAIL_OUT(ret);
1074 }
1075
1076
1077
1078
1079 if (NULL==(ev = NDRX_FPMALLOC(sizeof(*ev), 0)))
1080 {
1081 err = errno;
1082 NDRX_LOG(log_error, "Failed to malloc ndrx_svq_ev_t: %s",
1083 strerror(err));
1084 userlog("Failed to malloc ndrx_svq_ev_t: %s",
1085 strerror(err));
1086 EXFAIL_OUT(ret);
1087 }
1088
1089 ev->data = NULL;
1090 ev->datalen = 0;
1091 ev->ev = NDRX_SVQ_EV_FD;
1092 ev->fd = fdtabmo_tmp[i].fd;
1093 ev->revents = fdtabmo_tmp[i].revents;
1094 ev->prev = NULL;
1095 ev->next = NULL;
1096
1097
1098
1099 ret = ndrx_svq_mqd_put_event(fdh->mqd, &ev);
1100
1101 if (NULL!=ev)
1102 {
1103 NDRX_FPFREE(ev);
1104 }
1105
1106 if (EXSUCCEED!=ret)
1107 {
1108 NDRX_LOG(log_error, "Failed to put FD event for %p - FAIL",
1109 fdh->mqd);
1110 EXFAIL_OUT(ret);
1111 }
1112
1113
1114 syncfd = EXFALSE;
1115 }
1116 }
1117 }
1118 }
1119
1120 out:
1121
1122 MUTEX_LOCK_V(M_mon_lock_fd);
1123 if (NULL!=M_mon.fdtab)
1124 {
1125 NDRX_FREE(M_mon.fdtab);
1126 }
1127
1128 if (NULL!=M_mon.fdtabmo)
1129 {
1130 NDRX_FREE(M_mon.fdtabmo);
1131 }
1132
1133 MUTEX_UNLOCK_V(M_mon_lock_fd);
1134
1135 if (NULL!=fdtabmo_tmp)
1136 {
1137 NDRX_FPFREE(fdtabmo_tmp);
1138 }
1139
1140
1141
1142
1143
1144 M_alive = EXFALSE;
1145
1146 if (EXSUCCEED!=ret)
1147 {
1148 NDRX_LOG(log_error, "System V Queue monitoring thread faced "
1149 "unhandled error - terminate!");
1150 userlog("System V Queue monitoring thread faced unhandled error - terminate!");
1151 exit(ret);
1152 }
1153
1154 return NULL;
1155 }
1156
1157
1158
1159
1160 expublic void ndrx_svq_event_exit(int detach)
1161 {
1162 NDRX_LOG(log_debug, "Terminating event thread...");
1163
1164 if (M_alive)
1165 {
1166 ndrx_svq_moncmd_term();
1167
1168
1169 if (0==pthread_equal(pthread_self(), M_mon.evthread))
1170 {
1171 NDRX_LOG(log_debug, "Join evthread...");
1172 pthread_join(M_mon.evthread, NULL);
1173 NDRX_LOG(log_debug, "Join evthread... (done)");
1174 }
1175 }
1176
1177 if (detach)
1178 {
1179
1180 ndrx_svqshm_detach();
1181 }
1182 }
1183
1184
1185
1186
1187 expublic void ndrx_svq_event_atexit(void)
1188 {
1189 ndrx_svq_event_exit(EXTRUE);
1190 }
1191
1192
1193
1194
1195
1196 expublic void ndrx_svq_fork_prepare(void)
1197 {
1198 NDRX_LOG(log_debug, "Preparing System V Aux thread for fork");
1199
1200 if (EXFAIL==M_mon.evpipe[READ] && EXFAIL==M_mon.evpipe[WRITE])
1201 {
1202 NDRX_LOG(log_debug, "evpipe not open -> nothing to close");
1203 goto out;
1204 }
1205
1206 ndrx_svq_event_exit(EXFALSE);
1207
1208
1209 if (EXSUCCEED!=close(M_mon.evpipe[READ]))
1210 {
1211 NDRX_LOG(log_error, "Failed to close READ PIPE %d: %s",
1212 M_mon.evpipe[READ], strerror(errno));
1213 }
1214 else
1215 {
1216 M_mon.evpipe[READ] = EXFAIL;
1217 }
1218
1219 if (EXSUCCEED!=close(M_mon.evpipe[WRITE]))
1220 {
1221 NDRX_LOG(log_error, "Failed to close WRITE PIPE %d: %s",
1222 M_mon.evpipe[WRITE], strerror(errno));
1223 }
1224 else
1225 {
1226 M_mon.evpipe[WRITE] = EXFAIL;
1227 }
1228
1229
1230
1231
1232
1233
1234
1235 out:
1236 return;
1237 }
1238
1239
1240
1241
1242 exprivate void ndrx_svq_fork_resume_child(void)
1243 {
1244
1245 }
1246
1247
1248
1249
1250 exprivate void ndrx_svq_fork_resume(void)
1251 {
1252 int err;
1253 int ret=EXSUCCEED;
1254 pthread_attr_t pthread_custom_attr;
1255
1256 NDRX_LOG(log_debug, "Restoring System V Aux thread after fork %d", (int)getpid());
1257
1258
1259
1260
1261
1262
1263 if (EXFAIL==pipe(M_mon.evpipe))
1264 {
1265 err = errno;
1266 NDRX_LOG(log_error, "pipe failed: %s", strerror(err));
1267 EXFAIL_OUT(ret);
1268 }
1269
1270 if (EXFAIL==fcntl(M_mon.evpipe[READ], F_SETFL,
1271 fcntl(M_mon.evpipe[READ], F_GETFL) | O_NONBLOCK))
1272 {
1273 err = errno;
1274 NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed: %s",
1275 strerror(err));
1276 EXFAIL_OUT(ret);
1277 }
1278
1279 if (NULL==(M_mon.fdtab = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1280 {
1281 err = errno;
1282 NDRX_LOG(log_error, "calloc for pollfd failed: %s", strerror(err));
1283 EXFAIL_OUT(ret);
1284 }
1285
1286 if (NULL==(M_mon.fdtabmo = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1287 {
1288 err = errno;
1289 NDRX_LOG(log_error, "calloc for pollfd mo failed: %s", strerror(err));
1290 EXFAIL_OUT(ret);
1291 }
1292
1293
1294
1295
1296 M_mon.fdtab[PIPE_POLL_IDX].fd = M_mon.evpipe[READ];
1297 M_mon.fdtab[PIPE_POLL_IDX].events = POLLIN;
1298
1299 M_mon.nrfds = 1;
1300
1301
1302 NDRX_LOG(log_debug, "System V Monitoring pipes fd read:%d write:%d",
1303 M_mon.evpipe[READ], M_mon.evpipe[WRITE]);
1304
1305 pthread_attr_init(&pthread_custom_attr);
1306 ndrx_platf_stack_set(&pthread_custom_attr);
1307 M_alive=EXTRUE;
1308 if (EXSUCCEED!=(ret=pthread_create(&(M_mon.evthread), &pthread_custom_attr,
1309 ndrx_svq_timeout_thread, NULL)))
1310 {
1311 M_alive=EXFALSE;
1312
1313 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "SystemV Event thread");
1314
1315 EXFAIL_OUT(ret);
1316 }
1317
1318 out:
1319
1320 if (EXSUCCEED!=ret)
1321 {
1322 NDRX_LOG(log_error, "System V AUX thread resume after fork failed - abort!");
1323 userlog("System V AUX thread resume after fork failed - abort!");
1324
1325 exit(EXFAIL);
1326 }
1327
1328 }
1329
1330
1331
1332
1333
1334 expublic int ndrx_svq_scanunit_set(int ms)
1335 {
1336 int prev_val = M_scanunit;
1337
1338 M_scanunit_was_init=EXTRUE;
1339 M_scanunit=ms;
1340
1341 return prev_val;
1342 }
1343
1344
1345
1346
1347
1348 expublic int ndrx_svq_event_init(void)
1349 {
1350 int err;
1351 int ret = EXSUCCEED;
1352 static int first = EXTRUE;
1353 pthread_attr_t pthread_custom_attr;
1354
1355
1356
1357 struct sigaction act;
1358
1359 NDRX_LOG(log_debug, "About to init System V Eventing sub-system");
1360 memset(&act, 0, sizeof(act));
1361
1362
1363
1364 sigemptyset(&act.sa_mask);
1365
1366 act.sa_handler = ndrx_svq_signal_action;
1367 act.sa_flags = SA_NODEFER;
1368
1369 if (EXSUCCEED!=sigaction(NDRX_SVQ_SIG, &act, 0))
1370 {
1371 err = errno;
1372 NDRX_LOG(log_error, "Failed to init sigaction: %s" ,strerror(err));
1373 EXFAIL_OUT(ret);
1374 }
1375
1376
1377 if (first)
1378 {
1379 if (!M_scanunit_was_init)
1380 {
1381 char *p = getenv(CONF_NDRX_SCANUNIT);
1382
1383 if (NULL!=p)
1384 {
1385 if (!ndrx_is_numberic(p))
1386 {
1387 NDRX_LOG(log_error, "ERROR ! %s is not number! Got [%s]",
1388 CONF_NDRX_SCANUNIT, p);
1389
1390 userlog("ERROR ! %s is not number! Got [%s]",
1391 CONF_NDRX_SCANUNIT, p);
1392 EXFAIL_OUT(ret);
1393 }
1394
1395 M_scanunit = atoi(p);
1396
1397 if (M_scanunit < 1)
1398 {
1399 NDRX_LOG(log_error, "ERROR ! %s is than min %d ms! Got %d",
1400 CONF_NDRX_SCANUNIT, CONF_NDRX_SCANUNIT_MIN, M_scanunit);
1401
1402 userlog("ERROR ! %s is than min %d ms! Got %d",
1403 CONF_NDRX_SCANUNIT, CONF_NDRX_SCANUNIT_MIN, M_scanunit);
1404 EXFAIL_OUT(ret);
1405 }
1406 }
1407 }
1408 NDRX_LOG(log_info, "System-V Scan unit set to %d ms", M_scanunit);
1409 }
1410
1411
1412
1413
1414
1415
1416 memset(&M_mon, 0, sizeof(M_mon));
1417
1418
1419 if (EXFAIL==pipe(M_mon.evpipe))
1420 {
1421 err = errno;
1422 NDRX_LOG(log_error, "pipe failed: %s", strerror(err));
1423 EXFAIL_OUT(ret);
1424 }
1425
1426 if (EXFAIL==fcntl(M_mon.evpipe[READ], F_SETFL,
1427 fcntl(M_mon.evpipe[READ], F_GETFL) | O_NONBLOCK))
1428 {
1429 err = errno;
1430 NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed: %s",
1431 strerror(err));
1432 EXFAIL_OUT(ret);
1433 }
1434
1435 M_mon.nrfds = 1;
1436 if (NULL==(M_mon.fdtab = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1437 {
1438 err = errno;
1439 NDRX_LOG(log_error, "calloc for pollfd failed: %s", strerror(err));
1440 EXFAIL_OUT(ret);
1441 }
1442
1443 if (NULL==(M_mon.fdtabmo = NDRX_CALLOC(M_mon.nrfds, sizeof(struct pollfd))))
1444 {
1445 err = errno;
1446 NDRX_LOG(log_error, "calloc for pollfd mo failed: %s", strerror(err));
1447 EXFAIL_OUT(ret);
1448 }
1449
1450
1451 M_mon.fdtab[PIPE_POLL_IDX].fd = M_mon.evpipe[READ];
1452 M_mon.fdtab[PIPE_POLL_IDX].events = POLLIN;
1453
1454
1455 NDRX_LOG(log_debug, "System V Monitoring pipes fd read:%d write:%d",
1456 M_mon.evpipe[READ], M_mon.evpipe[WRITE]);
1457
1458 pthread_attr_init(&pthread_custom_attr);
1459 ndrx_platf_stack_set(&pthread_custom_attr);
1460
1461 M_alive=EXTRUE;
1462 if (EXSUCCEED!=(ret=pthread_create(&(M_mon.evthread), &pthread_custom_attr,
1463 ndrx_svq_timeout_thread, NULL)))
1464 {
1465 M_alive=EXFALSE;
1466 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "SystemV Event thread");
1467 EXFAIL_OUT(ret);
1468 }
1469
1470
1471
1472 if (first)
1473 {
1474 #if 0
1475 - OS will remove all the resources used.
1476 this code causes core dumps, if some process performs exit() in the middle
1477 of processing.
1478
1479 if (EXSUCCEED!=atexit(ndrx_svq_event_atexit))
1480 {
1481 err = errno;
1482 NDRX_LOG(log_error, "Failed to register ndrx_svq_event_exit with atexit(): %s",
1483 strerror(err));
1484 userlog("Failed to register ndrx_svq_event_exit with atexit(): %s",
1485 strerror(err));
1486 EXFAIL_OUT(ret);
1487 }
1488 #endif
1489
1490 if (EXSUCCEED!=(ret=ndrx_atfork(ndrx_svq_fork_prepare,
1491
1492 ndrx_svq_fork_resume, ndrx_svq_fork_resume_child)))
1493 {
1494 M_alive=EXFALSE;
1495 NDRX_LOG(log_error, "Failed to register fork handlers: %s",
1496 strerror(ret));
1497 userlog("Failed to register fork handlers: %s", strerror(ret));
1498 EXFAIL_OUT(ret);
1499 }
1500
1501
1502 first = EXFALSE;
1503 }
1504
1505 out:
1506 errno = err;
1507 return ret;
1508 }
1509
1510
1511
1512
1513
1514
1515 exprivate int ndrx_svq_moncmd_send(ndrx_svq_mon_cmd_t *cmd)
1516 {
1517 int ret = EXSUCCEED;
1518 int err = 0;
1519
1520 if (M_mon.evpipe[WRITE] != EXFAIL)
1521 {
1522 if (EXFAIL==write (M_mon.evpipe[WRITE], (char *)cmd,
1523 sizeof(ndrx_svq_mon_cmd_t)))
1524 {
1525 err = errno;
1526 NDRX_LOG(log_error, "Error ! write fail: %s", strerror(err));
1527 userlog("Error ! write fail: %s", strerror(errno));
1528 EXFAIL_OUT(ret);
1529 }
1530 }
1531 else
1532 {
1533 NDRX_LOG(log_info, "No event thread -> pipe closed.");
1534 }
1535
1536 out:
1537 errno = err;
1538 return ret;
1539 }
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550 expublic int ndrx_svq_moncmd_tout(mqd_t mqd, ndrx_stopwatch_t *stamp_time,
1551 unsigned long stamp_seq, struct timespec *abs_timeout, int syncfd)
1552 {
1553 ndrx_svq_mon_cmd_t cmd;
1554
1555 memset(&cmd, 0, sizeof(cmd));
1556
1557 cmd.cmd = NDRX_SVQ_MON_TOUT;
1558 cmd.mqd = mqd;
1559 cmd.stamp_time = *stamp_time;
1560 cmd.stamp_seq = stamp_seq;
1561 cmd.abs_timeout = *abs_timeout;
1562
1563 if (syncfd)
1564 {
1565 cmd.flags|=NDRX_SVQ_MONF_SYNCFD;
1566 }
1567
1568 return ndrx_svq_moncmd_send(&cmd);
1569 }
1570
1571
1572
1573
1574
1575
1576
1577
1578 expublic int ndrx_svq_moncmd_addfd(mqd_t mqd, int fd, uint32_t events)
1579 {
1580 ndrx_svq_mon_cmd_t cmd;
1581
1582 memset(&cmd, 0, sizeof(cmd));
1583
1584 cmd.cmd = NDRX_SVQ_MON_ADDFD;
1585 cmd.mqd = mqd;
1586 cmd.fd = fd;
1587 cmd.events = events;
1588
1589 return ndrx_svq_moncmd_send(&cmd);
1590 }
1591
1592
1593
1594
1595
1596
1597 expublic int ndrx_svq_moncmd_rmfd(int fd)
1598 {
1599 ndrx_svq_mon_cmd_t cmd;
1600
1601 memset(&cmd, 0, sizeof(cmd));
1602
1603 cmd.cmd = NDRX_SVQ_MON_RMFD;
1604 cmd.fd = fd;
1605
1606 return ndrx_svq_moncmd_send(&cmd);
1607 }
1608
1609
1610
1611
1612
1613 expublic int ndrx_svq_moncmd_term(void)
1614 {
1615 ndrx_svq_mon_cmd_t cmd;
1616 int ret;
1617
1618 memset(&cmd, 0, sizeof(cmd));
1619
1620 cmd.cmd = NDRX_SVQ_MON_TERM;
1621
1622 ret=ndrx_svq_moncmd_send(&cmd);
1623
1624
1625
1626 return ret;
1627 }
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643 expublic int ndrx_svq_event_sndrcv(mqd_t mqd, char *ptr, ssize_t *maxlen,
1644 struct timespec *abs_timeout, ndrx_svq_ev_t **ev, int is_send, int syncfd)
1645 {
1646 int ret = EXSUCCEED;
1647 int err=0;
1648 int msgflg;
1649 int len;
1650 ndrx_svq_ev_t cur_stamp;
1651
1652
1653 if (mqd->attr.mq_flags & O_NONBLOCK)
1654 {
1655 NDRX_LOG(log_debug, "O_NONBLOCK set");
1656 msgflg = IPC_NOWAIT;
1657 }
1658 else
1659 {
1660 msgflg = 0;
1661 }
1662
1663 *ev = NULL;
1664 len = *maxlen;
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675 ndrx_stopwatch_reset(&(mqd->stamp_time));
1676
1677 NDRX_SPIN_LOCK_V((mqd->stamplock));
1678
1679 mqd->stamp_seq++;
1680 cur_stamp.stamp_seq = mqd->stamp_seq;
1681
1682 memcpy(&(mqd->stamp_time), &cur_stamp.stamp_time, sizeof(cur_stamp.stamp_time));
1683
1684 NDRX_SPIN_UNLOCK_V(mqd->stamplock);
1685
1686
1687
1688
1689 NDRX_LOG(log_debug, "timeout tv_sec=%ld tv_nsec=%ld",
1690 (long)abs_timeout->tv_sec, (long)abs_timeout->tv_nsec);
1691
1692 if (0!=abs_timeout->tv_nsec || 0!=abs_timeout->tv_sec || syncfd)
1693 {
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704 if (syncfd)
1705 {
1706
1707 if (EXSUCCEED!=ndrx_svq_moncmd_tout(mqd, &(mqd->stamp_time), mqd->stamp_seq,
1708 abs_timeout, syncfd))
1709 {
1710 err = EFAULT;
1711 NDRX_LOG(log_error, "Failed to request timeout to ndrx_svq_moncmd_tout()");
1712 userlog("Failed to request timeout to ndrx_svq_moncmd_tout()");
1713 EXFAIL_OUT(ret);
1714 }
1715 }
1716 else
1717 {
1718
1719
1720
1721 if (EXSUCCEED!=ndrx_svq_mqd_hash_add(mqd, &(mqd->stamp_time), mqd->stamp_seq,
1722 abs_timeout))
1723 {
1724 err = EFAULT;
1725 NDRX_LOG(log_error, "Failed to request timeout to ndrx_svq_mqd_hash_add()");
1726 userlog("Failed to request timeout to ndrx_svq_mqd_hash_add()");
1727 EXFAIL_OUT(ret);
1728 }
1729 }
1730 }
1731 else
1732 {
1733 NDRX_LOG(log_debug, "timeout not needed... mqd=%p", mqd);
1734 }
1735
1736
1737
1738
1739 NDRX_SPIN_LOCK_V((mqd->rcvlockb4));
1740
1741
1742
1743
1744
1745 MUTEX_LOCK_V(mqd->qlock);
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757 while (NULL!=mqd->eventq &&
1758 NDRX_SVQ_EV_TOUT==mqd->eventq->ev &&
1759 !(NDRX_SVQ_TOUT_MATCH((mqd->eventq), (&cur_stamp))))
1760 {
1761
1762 *ev = mqd->eventq;
1763 DL_DELETE(mqd->eventq, *ev);
1764 NDRX_FPFREE(*ev);
1765 *ev = NULL;
1766 }
1767
1768 if (NULL!=mqd->eventq)
1769 {
1770 *ev = mqd->eventq;
1771 DL_DELETE(mqd->eventq, *ev);
1772 MUTEX_UNLOCK_V(mqd->qlock);
1773 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
1774
1775 NDRX_LOG(log_info, "Got event in q %p: %d", mqd, (*ev)->ev);
1776 EXFAIL_OUT(ret);
1777 }
1778
1779
1780
1781
1782
1783
1784 mqd->thread = pthread_self();
1785 M_signalled = EXFALSE;
1786
1787 NDRX_SPIN_LOCK_V((mqd->rcvlock));
1788
1789 MUTEX_UNLOCK_V(mqd->qlock);
1790
1791
1792
1793
1794
1795
1796
1797 if (!M_signalled)
1798 {
1799
1800 if (is_send)
1801 {
1802 ret=msgsnd (mqd->qid, ptr, NDRX_SVQ_INLEN(len), msgflg);
1803 }
1804 else
1805 {
1806 ret=msgrcv (mqd->qid, ptr, NDRX_SVQ_INLEN(len), 0, msgflg);
1807 }
1808
1809 if (EXFAIL==ret)
1810 {
1811 err=errno;
1812 }
1813 }
1814 else
1815 {
1816
1817
1818
1819 ret=EXFAIL;
1820 err=EINTR;
1821 }
1822
1823
1824
1825 NDRX_SPIN_UNLOCK_V(mqd->rcvlock);
1826 NDRX_SPIN_UNLOCK_V(mqd->rcvlockb4);
1827
1828 MUTEX_LOCK_V(mqd->barrier);
1829 MUTEX_UNLOCK_V(mqd->barrier);
1830
1831
1832 if (EXFAIL!=ret)
1833 {
1834
1835
1836
1837 if (!is_send)
1838 {
1839 *maxlen = NDRX_SVQ_OUTLEN(ret);
1840 }
1841
1842 ret=EXSUCCEED;
1843 goto out;
1844 }
1845 else
1846 {
1847 if (EINTR!=err)
1848 {
1849 int lev = log_error;
1850
1851 if (ENOMSG==err)
1852 {
1853 lev=log_debug;
1854 }
1855 NDRX_LOG(lev, "MQ op fail qid:%d len:%d msgflg: %d: %s",
1856 mqd->qid, len, msgflg, strerror(err));
1857 EXFAIL_OUT(ret);
1858 }
1859 else
1860 {
1861 NDRX_LOG(log_debug, "(EINTR) MQ op fail qid:%d len:%d msgflg: %d: %s",
1862 mqd->qid, len, msgflg, strerror(err));
1863 }
1864 }
1865
1866
1867 ret=EXSUCCEED;
1868
1869
1870 MUTEX_LOCK_V(mqd->qlock);
1871
1872
1873 while (NULL!=mqd->eventq &&
1874 NDRX_SVQ_EV_TOUT==mqd->eventq->ev &&
1875 !(NDRX_SVQ_TOUT_MATCH((mqd->eventq), (&cur_stamp))))
1876 {
1877
1878 *ev = mqd->eventq;
1879 DL_DELETE(mqd->eventq, *ev);
1880 NDRX_FPFREE(*ev);
1881 *ev = NULL;
1882 }
1883
1884
1885 if (NULL!=mqd->eventq)
1886 {
1887 *ev = mqd->eventq;
1888 DL_DELETE(mqd->eventq, *ev);
1889 MUTEX_UNLOCK_V(mqd->qlock);
1890
1891 NDRX_LOG(log_info, "Got event in q %p: %d", mqd, (*ev)->ev);
1892
1893
1894 EXFAIL_OUT(ret);
1895 }
1896
1897
1898 MUTEX_UNLOCK_V(mqd->qlock);
1899
1900 out:
1901
1902
1903 ndrx_svq_mqd_hash_del(mqd);
1904
1905
1906
1907
1908
1909 if (EXSUCCEED==ret && NULL==*ev && EINTR==err)
1910 {
1911 NDRX_LOG(log_error, "Interrupted by external signal, M_signalled=%d", M_signalled);
1912 ret=EXFAIL;
1913 }
1914
1915 errno = err;
1916
1917 return ret;
1918 }
1919
1920
1921