0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <time.h>
0041
0042 #include <unistd.h>
0043 #include <stdarg.h>
0044 #include <ctype.h>
0045 #include <memory.h>
0046 #include <errno.h>
0047 #include <signal.h>
0048 #include <limits.h>
0049 #include <pthread.h>
0050 #include <string.h>
0051
0052 #include <ndrstandard.h>
0053 #include <ndebug.h>
0054 #include <nstdutil.h>
0055 #include <limits.h>
0056
0057 #include <sys_unix.h>
0058 #include <sys_mqueue.h>
0059 #include <sys_svq.h>
0060 #include <fcntl.h>
0061
0062 #include <atmi.h>
0063 #include <atmi_int.h>
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073 typedef struct
0074 {
0075 char svcnm[MAXTIDENT+1];
0076 mqd_t mqd;
0077 int idx;
0078 EX_hash_handle hh;
0079 } ndrx_svq_pollsvc_t;
0080
0081
0082
0083
0084
0085
0086 exprivate char M_mainqstr[NDRX_MAX_Q_SIZE+1] = "";
0087
0088
0089
0090
0091 exprivate mqd_t M_mainq = NULL;
0092
0093
0094
0095
0096 exprivate ndrx_svq_pollsvc_t * M_svcmap = NULL;
0097
0098
0099
0100
0101 exprivate int M_accept_any = EXFALSE;
0102
0103
0104
0105
0106 exprivate int M_nrfds = 0;
0107
0108
0109
0110
0111 NDRX_SPIN_LOCKDECL(M_nrfds_lock);
0112
0113
0114
0115
0116
0117
0118
0119
0120 expublic int ndrx_epoll_down(int force)
0121 {
0122 int ret = EXSUCCEED;
0123
0124 ret=ndrx_svqshm_down(force);
0125
0126 out:
0127 return ret;
0128 }
0129
0130
0131
0132
0133
0134 expublic int ndrx_epoll_shmdetach(void)
0135 {
0136
0137 ndrx_svq_event_exit(EXTRUE);
0138
0139 return EXSUCCEED;
0140 }
0141
0142
0143
0144
0145
0146 expublic mqd_t ndrx_svq_mainq_get(void)
0147 {
0148 return M_mainq;
0149 }
0150
0151
0152
0153
0154
0155 expublic int ndrx_epoll_resid_get(void)
0156 {
0157 return M_mainq->qid;
0158 }
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168 expublic int ndrx_epoll_service_translate(char *send_q, char *q_prefix,
0169 char *svc, int resid)
0170 {
0171 return ndrx_svqshm_get_qid(resid, send_q, NDRX_MAX_Q_SIZE+1);
0172 }
0173
0174
0175
0176
0177
0178 expublic void ndrx_epoll_mainq_set(char *qstr)
0179 {
0180 NDRX_STRCPY_SAFE(M_mainqstr, qstr);
0181 NDRX_LOG(log_debug, "Setting System V main dispatching queue: [%s]", qstr);
0182 }
0183
0184
0185
0186
0187
0188
0189 expublic int ndrx_epoll_shallopenq(int idx)
0190 {
0191 if (ATMI_SRV_ADMIN_Q==idx || ATMI_SRV_REPLY_Q==idx)
0192 {
0193 return EXTRUE;
0194 }
0195
0196 return EXFALSE;
0197 }
0198
0199
0200
0201
0202
0203
0204 exprivate ndrx_svq_pollsvc_t * ndrx_epoll_getsvc(char *svcnm)
0205 {
0206 ndrx_svq_pollsvc_t *ret = NULL;
0207 ndrx_svq_pollsvc_t *elt = NULL;
0208
0209 EXHASH_FIND_STR(M_svcmap, svcnm, ret);
0210
0211 if (NULL==ret && M_accept_any)
0212 {
0213 EXHASH_ITER(hh, M_svcmap, ret, elt)
0214 {
0215 if (ret->idx > ATMI_SRV_REPLY_Q)
0216 {
0217 NDRX_LOG(log_debug, "Accepting any msg svcnm [%s] mapped to [%s]",
0218 svcnm, ret->svcnm);
0219 goto out;
0220 }
0221 }
0222 ret = NULL;
0223 }
0224
0225 if (NULL==ret)
0226 {
0227 NDRX_LOG(log_error, "Failed to find queue definition for [%s] service",
0228 svcnm);
0229 }
0230
0231 out:
0232 return ret;
0233 }
0234
0235
0236
0237
0238
0239 exprivate ndrx_svq_pollsvc_t * ndrx_epoll_getfirstna(void)
0240 {
0241 ndrx_svq_pollsvc_t *el = NULL, *elt;
0242
0243 EXHASH_ITER(hh, M_svcmap, el, elt)
0244 {
0245 if (ATMI_SRV_ADMIN_Q!=el->idx)
0246 {
0247 break;
0248 }
0249 }
0250
0251 out:
0252 return el;
0253 }
0254
0255
0256
0257
0258
0259
0260
0261
0262
0263
0264 expublic mqd_t ndrx_epoll_service_add(char *svcnm, int idx, mqd_t mq_exits)
0265 {
0266 int ret = EXSUCCEED;
0267 mqd_t mq = NULL;
0268 ndrx_svq_pollsvc_t *el = NULL;
0269 int err;
0270 char *adminsvc = NDRX_SVC_ADMIN;
0271 char *replysvc = NDRX_SVC_REPLY;
0272
0273 if (NULL==(el=NDRX_MALLOC(sizeof(*el))))
0274 {
0275 err = errno;
0276 NDRX_LOG(log_error, "Failed to malloc %d bytes: %s",
0277 sizeof(*el), strerror(err));
0278 userlog("Failed to malloc %d bytes: %s", sizeof(*el),
0279 strerror(err));
0280 EXFAIL_OUT(ret);
0281 }
0282
0283
0284
0285 if (ATMI_SRV_ADMIN_Q==idx)
0286 {
0287 svcnm = adminsvc;
0288 mq = mq_exits;
0289
0290
0291
0292
0293
0294 NDRX_LOG(log_debug, "About to init admin thread...");
0295 if (EXSUCCEED!=ndrx_svqadmin_init(mq))
0296 {
0297 NDRX_LOG(log_error, "Failed to init admin queue");
0298 userlog("Failed to init admin queue");
0299 EXFAIL_OUT(ret);
0300 }
0301
0302 }
0303 else if (ATMI_SRV_REPLY_Q==idx)
0304 {
0305 svcnm = replysvc;
0306 mq = mq_exits;
0307 }
0308 else if ((mqd_t)EXFAIL!=mq_exits)
0309 {
0310 mq = mq_exits;
0311 }
0312 else if (NULL==(mq=NDRX_MALLOC(1)))
0313 {
0314 err = errno;
0315 NDRX_LOG(log_error, "Failed to malloc 1 byte: %s", strerror(err));
0316 userlog("Failed to malloc 1 byte: %s", strerror(err));
0317 EXFAIL_OUT(ret);
0318 }
0319
0320 if (0==strncmp(svcnm, NDRX_SVC_BRIDGE, NDRX_SVC_BRIDGE_STATLEN))
0321 {
0322 NDRX_LOG(log_info, "Accepting any msg");
0323 M_accept_any = EXTRUE;
0324 }
0325
0326 el->mqd = mq;
0327 el->idx = idx;
0328
0329 NDRX_STRCPY_SAFE(el->svcnm, svcnm);
0330
0331
0332 EXHASH_ADD_STR( M_svcmap, svcnm, el );
0333
0334 NDRX_LOG(log_debug, "[%s] mapped to %p idx %d", el->svcnm, el->mqd, el->idx);
0335
0336 out:
0337
0338 if (EXSUCCEED!=ret && NULL!=mq)
0339 {
0340 NDRX_FREE((char *)mq);
0341 mq=NULL;
0342 }
0343
0344 if (EXSUCCEED!=ret && NULL!=el)
0345 {
0346 NDRX_FREE((char *)el);
0347 }
0348
0349 errno=err;
0350 return mq;
0351 }
0352
0353
0354
0355
0356
0357 expublic int ndrx_epoll_sys_init(void)
0358 {
0359 int ret = EXSUCCEED;
0360
0361 NDRX_SPIN_INIT_V(M_nrfds_lock);
0362
0363
0364 if (EXSUCCEED!=ndrx_svqshm_init(EXFALSE))
0365 {
0366 NDRX_LOG(log_error, "Failed to init System V Aux thread/SHM");
0367 EXFAIL_OUT(ret);
0368 }
0369
0370 out:
0371 return ret;
0372 }
0373
0374
0375
0376
0377 expublic void ndrx_epoll_sys_uninit(void)
0378 {
0379 return;
0380 }
0381
0382
0383
0384
0385
0386 expublic char * ndrx_epoll_mode(void)
0387 {
0388 static char *mode = "SystemV";
0389
0390 return mode;
0391 }
0392
0393
0394
0395
0396
0397
0398
0399
0400
0401 expublic int ndrx_epoll_ctl(int epfd, int op, int fd,
0402 struct ndrx_epoll_event *event)
0403 {
0404 int ret = EXSUCCEED;
0405 int err = 0;
0406
0407
0408
0409 switch (op)
0410 {
0411 case EX_EPOLL_CTL_ADD:
0412 if (EXSUCCEED!=(ret = ndrx_svq_moncmd_addfd(M_mainq, fd, event->events)))
0413 {
0414 err = errno;
0415 NDRX_LOG(log_error, "Failed to add fd %d to mqd %p for polling: %s",
0416 fd, M_mainq);
0417 }
0418
0419 NDRX_SPIN_LOCK_V(M_nrfds_lock);
0420 M_nrfds++;
0421 NDRX_SPIN_UNLOCK_V(M_nrfds_lock);
0422 break;
0423 case EX_EPOLL_CTL_DEL:
0424 if (EXSUCCEED!=(ret = ndrx_svq_moncmd_rmfd(fd)))
0425 {
0426 err = errno;
0427 NDRX_LOG(log_error, "Failed to remove fd %d to mqd %p for polling: %s",
0428 fd, M_mainq);
0429 }
0430
0431 NDRX_SPIN_LOCK_V(M_nrfds_lock);
0432 M_nrfds--;
0433 NDRX_SPIN_UNLOCK_V(M_nrfds_lock);
0434 break;
0435 default:
0436 NDRX_LOG(log_warn, "Unsupported operation: %d", op);
0437 errno=EINVAL;
0438 EXFAIL_OUT(ret);
0439 break;
0440 }
0441 out:
0442 errno = err;
0443 return ret;
0444 }
0445
0446
0447
0448
0449
0450
0451
0452
0453
0454 expublic int ndrx_epoll_ctl_mq(int epfd, int op, mqd_t fd,
0455 struct ndrx_epoll_event *event)
0456 {
0457 int ret = EXSUCCEED;
0458 ndrx_svq_pollsvc_t *el, *elt;
0459
0460
0461
0462
0463
0464 NDRX_LOG(log_debug, "Op %d on mqd=%p from poller", op, fd);
0465 if (EX_EPOLL_CTL_DEL==op)
0466 {
0467 EXHASH_ITER(hh, M_svcmap, el, elt)
0468 {
0469 if (el->mqd==fd)
0470 {
0471
0472 if (ATMI_SRV_ADMIN_Q==el->idx)
0473 {
0474 if (EXSUCCEED!=ndrx_svqadmin_deinit())
0475 {
0476 NDRX_LOG(log_error, "Failed to terminate ADMIN thread!");
0477 EXFAIL_OUT(ret);
0478 }
0479 }
0480
0481 EXHASH_DEL(M_svcmap, el);
0482
0483 if (!ndrx_epoll_shallopenq(el->idx))
0484 {
0485 NDRX_LOG(log_info, "Free up virtual mqd %p", el->mqd);
0486 NDRX_FREE((char *)el->mqd);
0487 }
0488 NDRX_FREE(el);
0489 }
0490 }
0491 }
0492
0493 out:
0494 return ret;
0495 }
0496
0497
0498
0499
0500
0501
0502 expublic int ndrx_epoll_create(int size)
0503 {
0504 int ret = 1;
0505 int err;
0506 struct mq_attr attr;
0507
0508 memset(&attr, 0, sizeof(attr));
0509
0510
0511
0512
0513 M_mainq = ndrx_mq_open(M_mainqstr, O_CREAT, 0660, &attr);
0514
0515 if ((mqd_t)EXFAIL==M_mainq)
0516 {
0517 err = errno;
0518 NDRX_LOG(log_error, "Failed to open main System V poller queue!");
0519 EXFAIL_OUT(ret);
0520 }
0521
0522
0523 if (EXSUCCEED!=ndrx_svqshm_ctl(NULL, M_mainq->qid, IPC_SET,
0524 NDRX_SVQ_MAP_RQADDR, NULL))
0525 {
0526 NDRX_LOG(log_error, "Failed to mark qid %d as request address type",
0527 M_mainq->qid);
0528 EXFAIL_OUT(ret);
0529 }
0530
0531
0532 out:
0533 return ret;
0534 }
0535
0536
0537
0538
0539 expublic int ndrx_epoll_close(int fd)
0540 {
0541 ndrx_svq_pollsvc_t *el, *elt;
0542
0543
0544
0545
0546
0547 EXHASH_ITER(hh, M_svcmap, el, elt)
0548 {
0549 EXHASH_DEL(M_svcmap, el);
0550 NDRX_FREE(el);
0551 }
0552
0553 return EXSUCCEED;
0554 }
0555
0556
0557
0558
0559
0560
0561
0562
0563
0564
0565
0566
0567
0568
0569
0570 expublic int ndrx_epoll_wait(int epfd, struct ndrx_epoll_event *events,
0571 int maxevents, int timeout, char **buf, int *buf_len)
0572 {
0573 int ret = EXSUCCEED;
0574 ssize_t rcvlen = *buf_len;
0575 ndrx_svq_ev_t *ev = NULL;
0576 int err = 0;
0577 int gottout = EXFALSE;
0578 ndrx_svq_pollsvc_t *svc;
0579 tp_command_call_t *call;
0580 tp_command_generic_t *gen_command;
0581 struct timespec tm;
0582
0583
0584 M_mainq->thread = pthread_self();
0585
0586
0587
0588 if (-1==timeout)
0589 {
0590
0591 tm.tv_sec = 0;
0592 tm.tv_nsec = 0;
0593 }
0594 else
0595 {
0596 clock_gettime(CLOCK_REALTIME, &tm);
0597 tm.tv_sec += (timeout / 1000);
0598 }
0599
0600 if (EXFAIL==ndrx_svq_event_sndrcv( M_mainq, *buf, &rcvlen,
0601 &tm, &ev, EXFALSE, M_nrfds))
0602 {
0603 err = errno;
0604 if (NULL!=ev)
0605 {
0606 switch (ev->ev)
0607 {
0608 case NDRX_SVQ_EV_TOUT:
0609 NDRX_LOG(log_debug, "Timed out");
0610 gottout = EXTRUE;
0611 break;
0612 case NDRX_SVQ_EV_DATA:
0613
0614
0615 NDRX_LOG(log_info, "Admin queue sends us something "
0616 "bytes %d", (int)ev->datalen);
0617
0618 events[0].is_mqd = EXTRUE;
0619
0620
0621 if (*buf_len < ev->datalen)
0622 {
0623 NDRX_LOG(log_error, "Receive from FD %d bytes, but max buffer %d",
0624 ev->datalen, *buf_len);
0625 userlog("Receive from FD %d bytes, but max buffer %d",
0626 ev->datalen, *buf_len);
0627
0628 err=EMSGSIZE;
0629 EXFAIL_OUT(ret);
0630 }
0631
0632 *buf_len = ev->datalen;
0633
0634
0635 NDRX_SYSBUF_FREE(*buf);
0636
0637 *buf = ev->data;
0638 ev->data = NULL;
0639
0640
0641
0642
0643
0644 if (NULL==(svc=ndrx_epoll_getsvc(NDRX_SVC_ADMIN)))
0645 {
0646 err=EFAULT;
0647 NDRX_LOG(log_error, "Missing admin [%s] queue def!",
0648 NDRX_SVC_ADMIN);
0649 EXFAIL_OUT(ret);
0650 }
0651
0652 events[0].data.mqd = svc->mqd;
0653 break;
0654 case NDRX_SVQ_EV_FD:
0655 NDRX_LOG(log_info, "File descriptor %d sends us something "
0656 "revents=%d bytes %d", ev->fd, (int)ev->revents,
0657 (int)ev->datalen);
0658 events[0].is_mqd = EXFALSE;
0659 events[0].events = ev->revents;
0660 events[0].data.fd = ev->fd;
0661
0662
0663 if (*buf_len < ev->datalen)
0664 {
0665 NDRX_LOG(log_error, "Receive from FD %d bytes, but max buffer %d",
0666 ev->datalen, *buf_len);
0667 userlog("Receive from FD %d bytes, but max buffer %d",
0668 ev->datalen, *buf_len);
0669
0670 err=EMSGSIZE;
0671 EXFAIL_OUT(ret);
0672 }
0673
0674 if (ev->datalen > 0)
0675 {
0676 *buf_len = ev->datalen;
0677
0678 NDRX_SYSBUF_FREE(*buf);
0679 *buf = ev->data;
0680 ev->data=NULL;
0681 }
0682 break;
0683
0684 default:
0685 NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0686 err = EBADF;
0687 break;
0688 }
0689 }
0690 else
0691 {
0692
0693 if (ENOMSG==err)
0694 {
0695 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", M_mainq->qid,
0696 strerror(err));
0697 err = EAGAIN;
0698 }
0699 else
0700 {
0701 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", M_mainq->qid,
0702 strerror(err));
0703 }
0704 EXFAIL_OUT(ret);
0705 }
0706 }
0707 else
0708 {
0709 gen_command = (tp_command_generic_t *)*buf;
0710
0711
0712 NDRX_LOG(log_debug, "Got message from main SysV queue %d "
0713 "bytes gencommand: %hd", rcvlen, gen_command->command_id);
0714
0715 switch (gen_command->command_id)
0716 {
0717 case ATMI_COMMAND_CONNECT:
0718 case ATMI_COMMAND_TPCALL:
0719 case ATMI_COMMAND_CONNRPLY:
0720 case ATMI_COMMAND_TPREPLY:
0721
0722 call = (tp_command_call_t *)*buf;
0723
0724 NDRX_LOG(log_debug, "Lookup service: [%s]", call->name);
0725 if (NULL==(svc=ndrx_epoll_getsvc(call->name)))
0726 {
0727 err=EAGAIN;
0728 NDRX_DUMP(log_error, "!!! Missing queue def - dumpg", *buf, rcvlen);
0729 NDRX_LOG(log_error, "!!! Missing queue def for [%s] data "
0730 "len %d- dropping "
0731 "msg - is all servers on RQADDR serving all services?",
0732 call->name, rcvlen);
0733 userlog("!!! Missing queue def for [%s] "
0734 "data len %d - dropping msg - is "
0735 "all servers on RQADDR serving all services?",
0736 call->name, rcvlen);
0737 EXFAIL_OUT(ret);
0738 }
0739 events[0].data.mqd = svc->mqd;
0740 break;
0741 default:
0742
0743 if (NULL==(svc=ndrx_epoll_getfirstna()))
0744 {
0745 err=EFAULT;
0746 NDRX_LOG(log_error, "Cannot find any non admin Q for event");
0747 userlog("Cannot find any non admin Q for event");
0748 EXFAIL_OUT(ret);
0749 }
0750
0751 events[0].data.mqd = svc->mqd;
0752
0753 break;
0754 }
0755
0756 events[0].is_mqd = EXTRUE;
0757
0758 NDRX_LOG(log_debug, "event mapped to %p (mqd subst)",
0759 events[0].data.mqd);
0760 *buf_len = rcvlen;
0761 }
0762
0763 out:
0764
0765 if (NULL!=ev)
0766 {
0767 if (NULL!=ev->data)
0768 {
0769 NDRX_SYSBUF_FREE(ev->data);
0770 }
0771
0772 NDRX_FPFREE(ev);
0773 }
0774
0775 if (EXSUCCEED==ret)
0776 {
0777 if (gottout)
0778 {
0779 errno = EAGAIN;
0780 return 0;
0781 }
0782 else
0783 {
0784 return 1;
0785 }
0786 }
0787 else
0788 {
0789 errno = err;
0790 return EXFAIL;
0791 }
0792 }
0793
0794
0795
0796
0797
0798 expublic int ndrx_epoll_errno(void)
0799 {
0800 return errno;
0801 }
0802
0803
0804
0805
0806
0807
0808 expublic char * ndrx_poll_strerror(int err)
0809 {
0810 return strerror(err);
0811 }
0812
0813