0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 #include <ndrx_config.h>
0045
0046 #ifdef EX_OS_AIX
0047
0048 #define _MSGQSUPPORT 1
0049 #endif
0050
0051 #include <stdlib.h>
0052 #include <stdio.h>
0053 #include <fcntl.h> /* For O_* constants */
0054 #include <sys/ipc.h>
0055 #include <sys/msg.h>
0056 #include <sys/time.h>
0057
0058 #include <ndrstandard.h>
0059
0060 #include <nstopwatch.h>
0061 #include <nstd_tls.h>
0062 #include <exhash.h>
0063 #include <ndebug.h>
0064 #include <sys_svq.h>
0065
0066
0067 #include "sys_unix.h"
0068
0069
0070
0071
0072 #define VALIDATE_MQD if ( NULL==mqd || (mqd_t)EXFAIL==mqd)\
0073 {\
0074 NDRX_LOG(log_error, "Invalid queue descriptor %p", mqd);\
0075 errno = EINVAL;\
0076 EXFAIL_OUT(ret);\
0077 }
0078
0079
0080
0081
0082 #define NDRX_SVAPOLL_TOUT_SET gettimeofday (&timeval, NULL);\
0083 tout = ((__abs_timeout->tv_sec - timeval.tv_sec)*1000 +\
0084 (__abs_timeout->tv_nsec/1000 - timeval.tv_usec)/1000);\
0085 ndrx_stopwatch_timer_set(&w, (int)tout);\
0086 wait_left = ndrx_stopwatch_get_delta(&w) * -1;\
0087 \
0088 if (wait_left<=0)\
0089 {\
0090 NDRX_LOG(log_error, "expired call: wait_left: %d tout: %ld qid: %d", wait_left, tout, mqd->qid);\
0091 }\
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108 expublic int ndrx_svq_close(mqd_t mqd)
0109 {
0110 NDRX_LOG(log_debug, "close %p mqd", mqd);
0111
0112 if (NULL!=mqd && (mqd_t)EXFAIL!=mqd)
0113 {
0114 #ifdef EX_USE_SYSVQ
0115
0116 #if 0
0117
0118
0119
0120
0121
0122
0123 mqd->self = (char *)mqd;
0124 ndrx_svq_delref_add(mqd);
0125
0126 if (EXSUCCEED!=ndrx_svq_moncmd_close(mqd))
0127 {
0128 NDRX_LOG(log_error, "Failed to close queue %p", mqd);
0129 userlog("Failed to close queue %p", mqd);
0130 }
0131
0132
0133
0134
0135
0136 #endif
0137
0138 ndrx_svq_mqd_close(mqd);
0139 NDRX_FPFREE(mqd);
0140 #endif
0141
0142 #ifdef EX_USE_SVAPOLL
0143 NDRX_FPFREE(mqd);
0144 #endif
0145
0146 return EXSUCCEED;
0147 }
0148 else
0149 {
0150 NDRX_LOG(log_error, "Invalid mqd %p!", mqd);
0151 errno = EBADF;
0152 return EXFAIL;
0153 }
0154 }
0155
0156
0157
0158
0159
0160
0161
0162 expublic int ndrx_svq_getattr(mqd_t mqd, struct mq_attr *attr)
0163 {
0164 int ret = EXSUCCEED;
0165 int err = 0;
0166 struct msqid_ds buf;
0167
0168 VALIDATE_MQD;
0169
0170 if (NULL==attr)
0171 {
0172 NDRX_LOG(log_error, "Invalid attr is null", mqd);
0173 errno = EINVAL;
0174 EXFAIL_OUT(ret);
0175 }
0176
0177 memcpy(attr, &(mqd->attr), sizeof(*attr));
0178
0179
0180 if (EXSUCCEED!=msgctl(mqd->qid, IPC_STAT, &buf))
0181 {
0182 err = errno;
0183 NDRX_LOG(log_debug, "Failed to get queue qid %d stats: %s",
0184 mqd->qid, strerror(err));
0185 userlog("Failed to get queue qid %d stats: %s",
0186 mqd->qid, strerror(err));
0187 }
0188
0189 attr->mq_curmsgs = (long)buf.msg_qnum;
0190
0191 out:
0192 errno = err;
0193 return ret;
0194 }
0195
0196
0197
0198
0199
0200
0201
0202 expublic int ndrx_svq_notify(mqd_t mqd, const struct sigevent *notification)
0203 {
0204 NDRX_LOG(log_error, "mq_notify() not supported by System V queue emulation!");
0205 userlog("mq_notify() not supported by System V queue emulation!");
0206 return EXFAIL;
0207 }
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221 expublic mqd_t ndrx_svq_open(const char *pathname, int oflag, mode_t mode,
0222 struct mq_attr *attr)
0223 {
0224
0225 mqd_t mq = (mqd_t)EXFAIL;
0226 int ret = EXSUCCEED;
0227 int errno_save;
0228
0229 NDRX_LOG(log_debug, "enter");
0230 mq = NDRX_FPMALLOC(sizeof(struct ndrx_svq_info), 0);
0231 memset(mq, 0, sizeof(struct ndrx_svq_info));
0232
0233 if (NULL==mq)
0234 {
0235 errno_save=errno;
0236 NDRX_LOG(log_error, "Failed to malloc %d bytes queue descriptor",
0237 (int)sizeof(struct ndrx_svq_info));
0238 userlog("%s: Failed to malloc %d bytes queue descriptor",
0239 __func__, (int)sizeof(struct ndrx_svq_info));
0240 mq = (mqd_t)EXFAIL;
0241 EXFAIL_OUT(ret);
0242 }
0243
0244
0245
0246
0247
0248 if (EXFAIL==(mq->qid = ndrx_svqshm_get((char *)pathname, mode, oflag)))
0249 {
0250 errno_save=errno;
0251 EXFAIL_OUT(ret);
0252 }
0253
0254
0255 NDRX_STRCPY_SAFE(mq->qstr, pathname);
0256 mq->mode = mode;
0257 if (NULL!=attr)
0258 {
0259 memcpy(&(mq->attr), attr, sizeof (*attr));
0260
0261
0262 if (oflag & O_NONBLOCK)
0263 {
0264 mq->attr.mq_flags|=O_NONBLOCK;
0265 NDRX_LOG(log_debug, "Opening in non blocked mode");
0266 }
0267 }
0268 #ifdef EX_USE_SYSVQ
0269
0270 NDRX_SPIN_INIT_V(mq->rcvlock);
0271 NDRX_SPIN_INIT_V(mq->rcvlockb4);
0272 NDRX_SPIN_INIT_V(mq->stamplock);
0273 MUTEX_VAR_INIT(mq->barrier);
0274 MUTEX_VAR_INIT(mq->qlock);
0275 #endif
0276
0277 out:
0278
0279 if (EXSUCCEED!=ret)
0280 {
0281 if (NULL!=(mqd_t)EXFAIL)
0282 {
0283 NDRX_FPFREE((char *)mq);
0284 mq = (mqd_t)EXFAIL;
0285 }
0286 }
0287
0288 NDRX_LOG(log_debug, "return %p/%ld", mq, (long)mq);
0289
0290 errno=errno_save;
0291 return mq;
0292 }
0293
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304 expublic ssize_t ndrx_svq_timedreceive(mqd_t mqd, char *ptr, size_t maxlen,
0305 unsigned int *priop, const struct timespec *__abs_timeout)
0306 {
0307
0308 #ifdef EX_USE_SYSVQ
0309 ssize_t ret = maxlen;
0310 ndrx_svq_ev_t *ev = NULL;
0311 int err = 0;
0312
0313 VALIDATE_MQD;
0314
0315
0316 mqd->thread = pthread_self();
0317
0318 if (EXSUCCEED!=ndrx_svq_event_sndrcv( mqd, ptr, &ret,
0319 (struct timespec *)__abs_timeout, &ev, EXFALSE, EXFALSE))
0320 {
0321 err = errno;
0322 if (NULL!=ev)
0323 {
0324 if (NDRX_SVQ_EV_TOUT==ev->ev)
0325 {
0326 NDRX_LOG(log_info, "Timed out");
0327 errno = ETIMEDOUT;
0328 }
0329 else
0330 {
0331 NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0332 errno = EBADF;
0333 }
0334 }
0335 else
0336 {
0337
0338 if (ENOMSG==err)
0339 {
0340 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid,
0341 strerror(err));
0342 errno = EAGAIN;
0343 }
0344 else
0345 {
0346 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid,
0347 strerror(err));
0348 }
0349 }
0350
0351 EXFAIL_OUT(ret);
0352 }
0353
0354 out:
0355
0356 if (NULL!=ev)
0357 {
0358
0359 err=errno;
0360 NDRX_FPFREE(ev);
0361 errno=err;
0362 }
0363
0364
0365 return ret;
0366 #endif
0367
0368 #ifdef EX_USE_SVAPOLL
0369
0370
0371
0372
0373
0374
0375
0376
0377 int wait_left;
0378 long tout;
0379 ndrx_stopwatch_t w;
0380 int ret;
0381 int err;
0382 long *l;
0383 struct timeval timeval;
0384
0385 VALIDATE_MQD;
0386
0387 NDRX_LOG(log_debug, "receiving msg mqd=%p, ptr=%p, maxlen=%d flags: %ld qid: %d",
0388 mqd, ptr, (int)maxlen, mqd->attr.mq_flags, mqd->qid);
0389
0390 if (maxlen<sizeof(long))
0391 {
0392 NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d",
0393 (int)sizeof(long), (int)maxlen);
0394 errno = EINVAL;
0395 EXFAIL_OUT(ret);
0396 }
0397
0398 l = (long *)ptr;
0399 *l = 1;
0400
0401 ret=msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, IPC_NOWAIT);
0402
0403
0404 if (EXFAIL==ret)
0405 {
0406 if (ENOMSG!=errno || mqd->attr.mq_flags & O_NONBLOCK)
0407 {
0408
0409 err = errno;
0410 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid,
0411 strerror(err));
0412
0413
0414 if (ENOMSG==err)
0415 {
0416 err = EAGAIN;
0417 }
0418
0419 errno = err;
0420 goto out;
0421 }
0422
0423 }
0424 else
0425 {
0426
0427 goto out;
0428 }
0429
0430 NDRX_SVAPOLL_TOUT_SET;
0431
0432
0433 errno=ETIMEDOUT;
0434 ret=EXFAIL;
0435
0436
0437 while (wait_left>0)
0438 {
0439
0440 struct ndrx_pollmsg msgs;
0441 unsigned long nfd = 1 << 16;
0442
0443 msgs.msgid=mqd->qid;
0444 msgs.reqevents = POLLIN;
0445 msgs.rtnevents = 0;
0446
0447 NDRX_LOG(log_debug, "wait_left: %d qid: %d", wait_left, mqd->qid);
0448 ret = poll((void *)&msgs, nfd, wait_left);
0449 err=errno;
0450 NDRX_LOG(log_debug, "poll ret: %d qid: %d wait_left: %d", ret, mqd->qid, wait_left);
0451 if (ret>0)
0452 {
0453
0454 if (EXFAIL==(ret = msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, IPC_NOWAIT)))
0455 {
0456 err = errno;
0457
0458 if (ENOMSG==err)
0459 {
0460 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid,
0461 strerror(err));
0462
0463 }
0464 else
0465 {
0466 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid,
0467 strerror(err));
0468 errno = err;
0469
0470 break;
0471 }
0472 }
0473 else
0474 {
0475
0476 break;
0477 }
0478 }
0479 else if (0==ret)
0480 {
0481 errno = ETIMEDOUT;
0482 ret=EXFAIL;
0483 break;
0484 }
0485 else
0486 {
0487
0488 NDRX_LOG(log_error, "poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0489 wait_left, strerror(err));
0490
0491 userlog("poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0492 wait_left, strerror(err));
0493 errno = err;
0494
0495
0496 break;
0497 }
0498
0499 wait_left = ndrx_stopwatch_get_delta(&w) * -1;
0500
0501 errno=ETIMEDOUT;
0502 ret=EXFAIL;
0503 }
0504
0505 out:
0506 if (ret>=0)
0507 {
0508 ret=NDRX_SVQ_OUTLEN(ret);
0509 }
0510
0511 return ret;
0512
0513 #endif
0514
0515 }
0516
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526
0527
0528 expublic int ndrx_svq_timedsend(mqd_t mqd, const char *ptr, size_t len,
0529 unsigned int prio, const struct timespec *__abs_timeout)
0530 {
0531
0532 #ifdef EX_USE_SYSVQ
0533 ssize_t ret = len;
0534 ndrx_svq_ev_t *ev = NULL;
0535 int err = 0;
0536 long *l = (long *)ptr;
0537
0538 *l = 1;
0539
0540
0541
0542 VALIDATE_MQD;
0543
0544 mqd->thread = pthread_self();
0545
0546 if (EXSUCCEED!=ndrx_svq_event_sndrcv( mqd, (char *)ptr, &ret,
0547 (struct timespec *)__abs_timeout, &ev, EXTRUE, EXFALSE))
0548 {
0549 err = errno;
0550 if (NULL!=ev)
0551 {
0552 if (NDRX_SVQ_EV_TOUT==ev->ev)
0553 {
0554 NDRX_LOG(log_warn, "Timed out");
0555 errno = ETIMEDOUT;
0556 }
0557 else
0558 {
0559 NDRX_LOG(log_error, "Unexpected event: %d", ev->ev);
0560 errno = EBADF;
0561 }
0562 }
0563 else
0564 {
0565
0566 if (ENOMSG==err)
0567 {
0568 NDRX_LOG(log_debug, "msgsnd(qid=%d) failed: %s", mqd->qid,
0569 strerror(err));
0570 errno = EAGAIN;
0571 }
0572 else
0573 {
0574 NDRX_LOG(log_error, "msgsnd(qid=%d) failed: %s", mqd->qid,
0575 strerror(err));
0576 }
0577 }
0578
0579 EXFAIL_OUT(ret);
0580 }
0581
0582 ret = EXSUCCEED;
0583 out:
0584
0585 if (NULL!=ev)
0586 {
0587
0588 err=errno;
0589 NDRX_FPFREE(ev);
0590 errno=err;
0591 }
0592
0593
0594 return ret;
0595
0596 #endif
0597
0598 #ifdef EX_USE_SVAPOLL
0599
0600
0601
0602
0603
0604
0605 ndrx_stopwatch_t w;
0606 int wait_left;
0607 long tout;
0608 int ret;
0609 int err;
0610 long *l;
0611 struct timeval timeval;
0612
0613 VALIDATE_MQD;
0614
0615 if (len<sizeof(long))
0616 {
0617 NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d",
0618 (int)sizeof(long), (int)len);
0619 errno = EINVAL;
0620 EXFAIL_OUT(ret);
0621 }
0622
0623 l = (long *)ptr;
0624 *l = 1;
0625
0626 ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), IPC_NOWAIT);
0627
0628
0629 if (EXFAIL == ret)
0630 {
0631 if (EAGAIN!=errno || mqd->attr.mq_flags & O_NONBLOCK)
0632 {
0633
0634 err = errno;
0635 NDRX_LOG(log_error, "msgsnd(qid=%d) failed: %s", mqd->qid,
0636 strerror(err));
0637 errno = err;
0638 goto out;
0639 }
0640 }
0641 else
0642 {
0643
0644 goto out;
0645 }
0646
0647 NDRX_SVAPOLL_TOUT_SET;
0648
0649
0650 errno=ETIMEDOUT;
0651 ret=EXFAIL;
0652
0653
0654
0655
0656 while (wait_left>0)
0657 {
0658
0659 struct ndrx_pollmsg msgs;
0660 unsigned long nfd = 1 << 16;
0661
0662 msgs.msgid=mqd->qid;
0663 msgs.reqevents = POLLOUT;
0664 msgs.rtnevents = 0;
0665
0666 NDRX_LOG(log_debug, "wait_left: %d qid: %d", wait_left, mqd->qid);
0667 ret = poll((void *)&msgs, nfd, wait_left);
0668 NDRX_LOG(log_debug, "poll ret=%d", ret);
0669
0670 if (ret>0)
0671 {
0672
0673 if (EXFAIL==(ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), IPC_NOWAIT)))
0674 {
0675 err=errno;
0676
0677
0678 if (EAGAIN==err)
0679 {
0680
0681
0682
0683
0684
0685
0686
0687
0688 usleep(1000);
0689
0690
0691 }
0692 else
0693 {
0694 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid,
0695 strerror(err));
0696 errno = err;
0697
0698
0699 break;
0700 }
0701 }
0702 else
0703 {
0704
0705 break;
0706 }
0707 }
0708 else if (0==ret)
0709 {
0710 errno = ETIMEDOUT;
0711 ret=EXFAIL;
0712 break;
0713 }
0714 else
0715 {
0716 err = errno;
0717
0718
0719 NDRX_LOG(log_error, "poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0720 wait_left, strerror(err));
0721
0722 userlog("poll (qid=%d) failed (wait_left: %d): %s", mqd->qid,
0723 wait_left, strerror(err));
0724 errno = err;
0725
0726
0727 break;
0728 }
0729
0730 wait_left = ndrx_stopwatch_get_delta(&w) * -1;
0731
0732 errno=ETIMEDOUT;
0733 ret=EXFAIL;
0734 }
0735
0736 out:
0737
0738
0739 return ret;
0740
0741 #endif
0742 }
0743
0744
0745
0746
0747
0748
0749
0750
0751
0752
0753
0754
0755 expublic int ndrx_svq_send(mqd_t mqd, const char *ptr, size_t len,
0756 unsigned int prio)
0757 {
0758 int ret = EXSUCCEED;
0759 long *l;
0760 int msgflg;
0761
0762 NDRX_LOG(log_debug, "sending msg mqd=%p, qid=%d, ptr=%p, len=%d",
0763 mqd, mqd->qid, ptr, (int)len);
0764
0765 VALIDATE_MQD;
0766
0767 if (len<sizeof(long))
0768 {
0769 NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d",
0770 (int)sizeof(long), (int)len);
0771 errno = EINVAL;
0772 EXFAIL_OUT(ret);
0773 }
0774
0775 l = (long *)ptr;
0776 *l = 1;
0777
0778 if (mqd->attr.mq_flags & O_NONBLOCK)
0779 {
0780 msgflg = IPC_NOWAIT;
0781 }
0782 else
0783 {
0784 msgflg = 0;
0785 }
0786
0787 ret = msgsnd(mqd->qid, ptr, NDRX_SVQ_INLEN(len), msgflg);
0788
0789
0790
0791 out:
0792 return ret;
0793 }
0794
0795
0796
0797
0798
0799
0800
0801
0802
0803
0804 expublic ssize_t ndrx_svq_receive(mqd_t mqd, char *ptr, size_t maxlen,
0805 unsigned int *priop)
0806 {
0807 int ret = EXSUCCEED;
0808 long *l;
0809 int msgflg;
0810
0811 VALIDATE_MQD;
0812
0813 NDRX_LOG(log_debug, "receiving msg mqd=%p, ptr=%p, maxlen=%d flags: %ld qid: %d",
0814 mqd, ptr, (int)maxlen, mqd->attr.mq_flags, mqd->qid);
0815
0816 if (maxlen<sizeof(long))
0817 {
0818 NDRX_LOG(log_error, "Invalid message size, the minimum is %d but got %d",
0819 (int)sizeof(long), (int)maxlen);
0820 errno = EINVAL;
0821 EXFAIL_OUT(ret);
0822 }
0823
0824 l = (long *)ptr;
0825 *l = 1;
0826
0827 if (mqd->attr.mq_flags & O_NONBLOCK)
0828 {
0829 msgflg = IPC_NOWAIT;
0830 }
0831 else
0832 {
0833 msgflg = 0;
0834 }
0835
0836 if (EXFAIL==(ret = msgrcv(mqd->qid, ptr, NDRX_SVQ_INLEN(maxlen), 0, msgflg)))
0837 {
0838 int err = errno;
0839
0840
0841 if (ENOMSG==err)
0842 {
0843 NDRX_LOG(log_debug, "msgrcv(qid=%d) failed: %s", mqd->qid,
0844 strerror(err));
0845 err = EAGAIN;
0846 }
0847 else
0848 {
0849 NDRX_LOG(log_error, "msgrcv(qid=%d) failed: %s", mqd->qid,
0850 strerror(err));
0851 }
0852
0853 errno = err;
0854 }
0855
0856
0857 if (ret>=0)
0858 {
0859 ret=NDRX_SVQ_OUTLEN(ret);
0860 }
0861
0862 out:
0863 return ret;
0864 }
0865
0866
0867
0868
0869
0870
0871
0872
0873 expublic int ndrx_svq_setattr(mqd_t mqd, const struct mq_attr *attr,
0874 struct mq_attr *oattr)
0875 {
0876 int ret = EXSUCCEED;
0877
0878 VALIDATE_MQD;
0879
0880 if (NULL==attr)
0881 {
0882 NDRX_LOG(log_error, "Invalid attr is null", mqd);
0883 errno = EINVAL;
0884 EXFAIL_OUT(ret);
0885 }
0886
0887
0888 if (NULL!=oattr)
0889 {
0890 memcpy(oattr, &(mqd->attr), sizeof(*oattr));
0891 }
0892
0893 memcpy(&(mqd->attr), attr, sizeof(*attr));
0894
0895 out:
0896 return ret;
0897 }
0898
0899
0900
0901
0902
0903
0904 expublic int ndrx_svq_unlink(const char *pathname)
0905 {
0906
0907
0908
0909
0910
0911 return ndrx_svqshm_ctl((char *)pathname, EXFAIL, IPC_RMID, EXFAIL, NULL);
0912
0913 }
0914
0915