0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039 #include <stdlib.h>
0040 #include <stdio.h>
0041 #include <stdarg.h>
0042 #include <sys/types.h>
0043 #include <sys/stat.h>
0044 #include <fcntl.h>
0045 #include <errno.h>
0046 #include <string.h>
0047 #include <sys/time.h>
0048
0049 #include <ndrstandard.h>
0050 #include "sys_emqueue.h"
0051 #include "sys_unix.h"
0052 #include "ndebug.h"
0053
0054 #include <stddef.h>
0055 #include <errno.h>
0056 #include <fcntl.h>
0057 #include <unistd.h>
0058 #include <stdio.h>
0059 #include <sys/mman.h>
0060 #include <signal.h>
0061 #include <thlock.h>
0062 #include <limits.h>
0063 #include <exhash.h>
0064 #include <nstopwatch.h>
0065 #include <nstd_tls.h>
0066
0067
0068
0069 #define LOCK_Q if ( (n = pthread_mutex_lock(&emqhdr->emqh_lock)) != 0)\
0070 {\
0071 NDRX_LOG(log_error, "EMQ: pthread_mutex_lock failed: %s", strerror(n));\
0072 userlog("EMQ: pthread_mutex_lock failed: %s", strerror(n));\
0073 errno = n;\
0074 return EXFAIL;\
0075 }
0076
0077 #define MAX_TRIES 10
0078
0079
0080 struct qd_hash
0081 {
0082 void *qd;
0083 EX_hash_handle hh;
0084 };
0085 typedef struct qd_hash qd_hash_t;
0086
0087
0088 exprivate struct mq_attr defattr = { 0, 128, 1024, 0 };
0089 exprivate MUTEX_LOCKDECL(M_lock);
0090 exprivate qd_hash_t *M_qd_hash = NULL;
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119 #ifdef EX_OS_DARWIN
0120
0121
0122
0123
0124
0125 expublic int ndrx_pthread_cond_timedwait(pthread_cond_t *restrict cond,
0126 pthread_mutex_t *restrict mutex,
0127 const struct timespec *restrict abstime)
0128 {
0129 int attempt = 0;
0130
0131 int ret;
0132 ndrx_osx_pthread_cond_t *p = (ndrx_osx_pthread_cond_t *)cond;
0133 do
0134 {
0135 if (attempt > 0)
0136 {
0137
0138 usleep(ndrx_rand() % 1000);
0139 }
0140
0141 p->busy = NULL;
0142 ret = pthread_cond_timedwait(cond, mutex, abstime);
0143
0144 attempt++;
0145
0146 } while (EINVAL==ret && attempt < NDRX_OSX_COND_FIX_ATTEMPTS);
0147
0148 return ret;
0149
0150 }
0151
0152
0153
0154
0155
0156 expublic int ndrx_pthread_cond_wait(pthread_cond_t *restrict cond,
0157 pthread_mutex_t *restrict mutex)
0158 {
0159 int attempt = 0;
0160 int ret;
0161 ndrx_osx_pthread_cond_t *p = (ndrx_osx_pthread_cond_t *)cond;
0162 do
0163 {
0164 if (attempt > 0)
0165 {
0166
0167 usleep(ndrx_rand() % 1000);
0168 }
0169
0170 p->busy = NULL;
0171 ret = pthread_cond_wait(cond, mutex);
0172
0173 attempt++;
0174
0175 } while (EINVAL==ret && attempt < NDRX_OSX_COND_FIX_ATTEMPTS);
0176
0177 return ret;
0178 }
0179
0180 #else
0181
0182 #define ndrx_pthread_cond_wait pthread_cond_wait
0183 #define ndrx_pthread_cond_timedwait pthread_cond_timedwait
0184
0185 #endif
0186
0187
0188
0189
0190
0191
0192
0193 exprivate int qd_exhash_add(mqd_t q)
0194 {
0195 int ret = EXSUCCEED;
0196 qd_hash_t * el = NDRX_FPMALLOC(sizeof(qd_hash_t), 0);
0197
0198 NDRX_LOG(log_dump, "Registering %p as mqd_t", q);
0199 if (NULL==el)
0200 {
0201 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(errno));
0202 EXFAIL_OUT(ret);
0203 }
0204
0205 el->qd = (void *)q;
0206
0207 MUTEX_LOCK_V(M_lock);
0208
0209 EXHASH_ADD_PTR(M_qd_hash, qd, el);
0210 NDRX_LOG(log_dump, "added...");
0211
0212 MUTEX_UNLOCK_V(M_lock);
0213
0214 out:
0215
0216 return ret;
0217 }
0218
0219
0220
0221
0222
0223
0224 exprivate int qd_hash_chk(mqd_t qd)
0225 {
0226 qd_hash_t *ret = NULL;
0227
0228 NDRX_LOG(log_dump, "checking qd %p", qd);
0229
0230 MUTEX_LOCK_V(M_lock);
0231
0232 EXHASH_FIND_PTR( M_qd_hash, ((void **)&qd), ret);
0233
0234 MUTEX_UNLOCK_V(M_lock);
0235
0236 if (NULL!=ret)
0237 {
0238 return EXTRUE;
0239 }
0240 else
0241 {
0242 return EXFALSE;
0243 }
0244 }
0245
0246
0247
0248
0249
0250
0251 exprivate void qd_hash_del(mqd_t qd)
0252 {
0253 qd_hash_t *ret = NULL;
0254
0255 NDRX_LOG(log_dump, "Unregistering %p as mqd_t", qd);
0256
0257 MUTEX_LOCK_V(M_lock);
0258 EXHASH_FIND_PTR( M_qd_hash, ((void **)&qd), ret);
0259
0260 if (NULL!=ret)
0261 {
0262 EXHASH_DEL(M_qd_hash, ret);
0263 NDRX_FPFREE(ret);
0264 }
0265
0266 MUTEX_UNLOCK_V(M_lock);
0267 }
0268
0269
0270
0271
0272
0273
0274
0275
0276
0277 static char *get_path(const char *path, char *bufout, size_t bufoutsz)
0278 {
0279 static int first = 1;
0280 static char q_path[PATH_MAX]={EXEOS};
0281
0282 if (first)
0283 {
0284 char *p;
0285 if (NULL!=(p=getenv(CONF_NDRX_QPATH)))
0286 {
0287 NDRX_STRCPY_SAFE(q_path, p);
0288 }
0289
0290 first = 0;
0291 }
0292
0293 NDRX_STRCPY_SAFE_DST(bufout, q_path, bufoutsz);
0294 NDRX_STRCAT_S(bufout, bufoutsz, path);
0295
0296 return bufout;
0297 }
0298
0299
0300
0301
0302
0303
0304 expublic int emq_close(mqd_t emqd)
0305 {
0306 long msgsize, filesize;
0307 struct emq_hdr *emqhdr;
0308 struct mq_attr *attr;
0309 struct emq_info *emqinfo;
0310
0311 emqinfo = emqd;
0312
0313
0314
0315
0316 if (!qd_hash_chk((mqd_t) emqd))
0317 {
0318 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0319 errno = EBADF;
0320 return EXFAIL;
0321 }
0322
0323 emqhdr = emqinfo->emqi_hdr;
0324 attr = &emqhdr->emqh_attr;
0325
0326 if (emq_notify(emqd, NULL) != EXSUCCEED)
0327 {
0328 return EXFAIL;
0329 }
0330
0331 msgsize = NDRX_EMQ_MSGSIZE(attr->mq_msgsize);
0332 filesize = sizeof(struct emq_hdr) + (attr->mq_maxmsg *
0333 (sizeof(struct emq_msg_hdr) + msgsize));
0334
0335 NDRX_LOG(log_dump, "Before munmap()");
0336
0337 if (munmap(emqinfo->emqi_hdr, filesize) == -1)
0338 {
0339 return EXFAIL;
0340 }
0341
0342 NDRX_LOG(log_dump, "After munmap()");
0343
0344 qd_hash_del(emqd);
0345 NDRX_FPFREE(emqinfo);
0346
0347 NDRX_LOG(log_dump, "into: emq_close ret 0");
0348
0349 return EXSUCCEED;
0350 }
0351
0352
0353
0354
0355 expublic int emq_getattr(mqd_t emqd, struct mq_attr *emqstat)
0356 {
0357 int n;
0358 struct emq_hdr *emqhdr;
0359 struct mq_attr *attr;
0360 struct emq_info *emqinfo;
0361
0362 NDRX_LOG(log_dump, "into: emq_getattr");
0363
0364
0365
0366
0367 if (!qd_hash_chk((mqd_t) emqd))
0368 {
0369 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0370 errno = EBADF;
0371 return EXFAIL;
0372 }
0373
0374 emqinfo = emqd;
0375 emqhdr = emqinfo->emqi_hdr;
0376 attr = &emqhdr->emqh_attr;
0377
0378 LOCK_Q;
0379
0380
0381 emqstat->mq_flags = emqinfo->emqi_flags;
0382 emqstat->mq_maxmsg = attr->mq_maxmsg;
0383 emqstat->mq_msgsize = attr->mq_msgsize;
0384 emqstat->mq_curmsgs = attr->mq_curmsgs;
0385
0386 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0387 NDRX_LOG(log_dump, "into: emq_getattr ret 0");
0388 return EXSUCCEED;
0389 }
0390
0391
0392
0393
0394
0395
0396
0397 expublic int emq_notify(mqd_t emqd, const struct sigevent *notification)
0398 {
0399 int n;
0400 pid_t pid;
0401 struct emq_hdr *emqhdr;
0402 struct emq_info *emqinfo;
0403
0404 if (!qd_hash_chk((mqd_t) emqd))
0405 {
0406 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0407 errno = EBADF;
0408 return EXFAIL;
0409 }
0410
0411 emqinfo = emqd;
0412 emqhdr = emqinfo->emqi_hdr;
0413
0414 LOCK_Q;
0415
0416 pid = getpid();
0417 if (notification == NULL)
0418 {
0419 if (emqhdr->emqh_pid == pid)
0420 {
0421 emqhdr->emqh_pid = 0;
0422 }
0423 }
0424 else
0425 {
0426 if (emqhdr->emqh_pid != 0)
0427 {
0428 if (kill(emqhdr->emqh_pid, 0) != -1 || errno != ESRCH)
0429 {
0430 errno = EBUSY;
0431 goto err;
0432 }
0433 }
0434 emqhdr->emqh_pid = pid;
0435 emqhdr->emqh_event = *notification;
0436 }
0437 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0438 return EXSUCCEED;
0439
0440 err:
0441 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0442 return EXFAIL;
0443
0444 }
0445
0446
0447
0448
0449
0450
0451
0452
0453 expublic mqd_t emq_open(const char *pathname, int oflag, ...)
0454 {
0455 int i, fd, nonblock, created, save_errno;
0456 long msgsize, filesize, index;
0457 va_list ap;
0458 mode_t mode;
0459 char *mptr;
0460 struct emq_msg_hdr *msghdr;
0461 struct mq_attr *attr;
0462 struct emq_info *emqinfo;
0463 struct stat statbuff;
0464 struct emq_hdr *emqhdr;
0465 char emq_x[PATH_MAX+1];
0466 pthread_mutexattr_t mattr;
0467 pthread_condattr_t cattr;
0468 mptr = (char *) MAP_FAILED;
0469
0470 created = EXFALSE;
0471 nonblock = oflag & O_NONBLOCK;
0472 oflag &= ~O_NONBLOCK;
0473 emqinfo = NULL;
0474 NDRX_LOG(log_dump, "into: emq_open");
0475
0476
0477 again:
0478 if (oflag & O_CREAT)
0479 {
0480 va_start(ap, oflag);
0481
0482 mode = va_arg(ap, int) & ~S_IXUSR;
0483 attr = va_arg(ap, struct mq_attr *);
0484 va_end(ap);
0485
0486
0487 fd = open(get_path(pathname, emq_x, sizeof(emq_x)),
0488 oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
0489
0490 if (fd < 0)
0491 {
0492 if (errno == EEXIST && (oflag & O_EXCL) == 0)
0493 {
0494 goto exists;
0495 }
0496 else
0497 {
0498 return((mqd_t) EXFAIL);
0499 }
0500 }
0501
0502 created = EXTRUE;
0503
0504 if (attr == NULL)
0505 {
0506 attr = &defattr;
0507 }
0508 else
0509 {
0510 if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0)
0511 {
0512 errno = EINVAL;
0513 goto err;
0514 }
0515 }
0516
0517 msgsize = NDRX_EMQ_MSGSIZE(attr->mq_msgsize);
0518
0519 filesize = sizeof(struct emq_hdr) + (attr->mq_maxmsg *
0520 (sizeof(struct emq_msg_hdr) + msgsize));
0521
0522 if (EXFAIL == lseek(fd, filesize - 1, SEEK_SET))
0523 {
0524 goto err;
0525 }
0526
0527 if (EXFAIL == write(fd, "", 1))
0528 {
0529 goto err;
0530 }
0531
0532 mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
0533 MAP_SHARED, fd, 0);
0534 if (mptr == MAP_FAILED)
0535 {
0536 goto err;
0537 }
0538
0539
0540 if ( (emqinfo = NDRX_FPMALLOC(sizeof(struct emq_info), 0)) == NULL)
0541 {
0542 goto err;
0543 }
0544
0545 emqinfo->emqi_hdr = emqhdr = (struct emq_hdr *) mptr;
0546 emqinfo->emqi_flags = nonblock;
0547
0548 emqhdr->emqh_attr.mq_flags = 0;
0549 emqhdr->emqh_attr.mq_maxmsg = attr->mq_maxmsg;
0550 emqhdr->emqh_attr.mq_msgsize = attr->mq_msgsize;
0551 emqhdr->emqh_attr.mq_curmsgs = 0;
0552 emqhdr->emqh_nwait = 0;
0553 emqhdr->emqh_pid = 0;
0554 emqhdr->emqh_head = 0;
0555 index = sizeof(struct emq_hdr);
0556 emqhdr->emqh_free = index;
0557
0558 for (i = 0; i < attr->mq_maxmsg - 1; i++)
0559 {
0560 msghdr = (struct emq_msg_hdr *) &mptr[index];
0561 index += sizeof(struct emq_msg_hdr) + msgsize;
0562 msghdr->msg_next = index;
0563 }
0564
0565 msghdr = (struct emq_msg_hdr *) &mptr[index];
0566
0567 msghdr->msg_next = 0;
0568
0569 if ( (i = pthread_mutexattr_init(&mattr)) != 0)
0570 {
0571 goto pthreaderr;
0572 }
0573
0574 if ((i=pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0)
0575 {
0576 NDRX_LOG(log_error, "Failed to set attribute PTHREAD_PROCESS_SHARED: %s", strerror(i));
0577 userlog("Failed to set attribute PTHREAD_PROCESS_SHARED: %s", strerror(i));
0578 goto pthreaderr;
0579 }
0580
0581 #if defined(NDRX_MUTEX_DEBUG) || defined(EX_OS_DARWIN)
0582 if((i = pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK)) < 0)
0583 {
0584 NDRX_LOG(log_error, "Failed to set attribute ERRORCHECK: %s", strerror(i));
0585 userlog("Failed to set attribute ERRORCHECK: %s", strerror(i));
0586 goto pthreaderr;
0587 }
0588 #endif
0589
0590 i = pthread_mutex_init(&emqhdr->emqh_lock, &mattr);
0591
0592 pthread_mutexattr_destroy(&mattr);
0593 if (i != 0)
0594 {
0595 NDRX_LOG(log_error, "Failed to pthread_mutex_init: %s", strerror(i));
0596 userlog("Failed to pthread_mutex_init: %s", strerror(i));
0597 goto pthreaderr;
0598 }
0599
0600 if ( (i = pthread_condattr_init(&cattr)) != 0)
0601 {
0602 goto pthreaderr;
0603 }
0604
0605 pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
0606 i = pthread_cond_init(&emqhdr->emqh_wait, &cattr);
0607 pthread_condattr_destroy(&cattr);
0608
0609 if (i != 0)
0610 {
0611 goto pthreaderr;
0612 }
0613
0614 if (EXFAIL==fchmod(fd, mode))
0615 {
0616 goto err;
0617 }
0618
0619 close(fd);
0620
0621 if (EXSUCCEED!=qd_exhash_add((mqd_t) emqinfo))
0622 {
0623 NDRX_LOG(log_error, "Failed to add mqd_t to hash!");
0624 errno = ENOMEM;
0625 }
0626 NDRX_LOG(log_dump, "into: emq_open ret ok");
0627 return((mqd_t) emqinfo);
0628 }
0629 exists:
0630
0631
0632 if ( (fd = open(get_path(pathname, emq_x, sizeof(emq_x)), O_RDWR)) < 0)
0633 {
0634 if (errno == ENOENT && (oflag & O_CREAT))
0635 {
0636 goto again;
0637 }
0638
0639 goto err;
0640 }
0641
0642
0643 for (i = 0; i < MAX_TRIES; i++)
0644 {
0645 if (EXFAIL == stat(get_path(pathname, emq_x, sizeof(emq_x)), &statbuff))
0646 {
0647 if (errno == ENOENT && (oflag & O_CREAT))
0648 {
0649 close(fd);
0650 goto again;
0651 }
0652 goto err;
0653 }
0654
0655 if ((statbuff.st_mode & S_IXUSR) == 0)
0656 {
0657 break;
0658 }
0659
0660 sleep(1);
0661 }
0662
0663 if (i == MAX_TRIES)
0664 {
0665 errno = ETIMEDOUT;
0666 goto err;
0667 }
0668
0669 filesize = statbuff.st_size;
0670
0671 mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
0672
0673 if (mptr == MAP_FAILED)
0674 {
0675 goto err;
0676 }
0677
0678 close(fd);
0679
0680
0681 if ( (emqinfo = NDRX_FPMALLOC(sizeof(struct emq_info), 0)) == NULL)
0682 {
0683 goto err;
0684 }
0685 emqinfo->emqi_hdr = (struct emq_hdr *) mptr;
0686 emqinfo->emqi_flags = nonblock;
0687
0688 if (EXSUCCEED!=qd_exhash_add((mqd_t) emqinfo))
0689 {
0690 NDRX_LOG(log_error, "Failed to add mqd_t to hash!");
0691 errno = ENOMEM;
0692 }
0693
0694 NDRX_LOG(log_dump, "into: emq_open ret ok");
0695 return((mqd_t) emqinfo);
0696 pthreaderr:
0697 errno = i;
0698 err:
0699
0700 save_errno = errno;
0701
0702 if (created)
0703 {
0704 unlink(get_path(pathname, emq_x, sizeof(emq_x)));
0705 }
0706
0707 if (mptr != MAP_FAILED)
0708 {
0709 munmap(mptr, filesize);
0710 }
0711
0712 if (emqinfo != NULL)
0713 {
0714 NDRX_FPFREE(emqinfo);
0715 }
0716
0717 close(fd);
0718
0719 NDRX_LOG(log_dump, "into: emq_open ret -1");
0720
0721 errno = save_errno;
0722 return((mqd_t) -1);
0723 }
0724
0725
0726
0727
0728
0729
0730
0731
0732
0733
0734
0735
0736 expublic ssize_t emq_timedreceive(mqd_t emqd, char *ptr, size_t maxlen, unsigned int *priop,
0737 const struct timespec *__abs_timeout)
0738 {
0739 int n;
0740 long index;
0741 char *mptr;
0742 ssize_t len;
0743 struct emq_hdr *emqhdr;
0744 struct mq_attr *attr;
0745 struct emq_msg_hdr *msghdr;
0746 struct emq_info *emqinfo;
0747
0748 NDRX_LOG(log_dump, "into: emq_timedreceive");
0749
0750 if (!qd_hash_chk((mqd_t) emqd))
0751 {
0752 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0753 errno = EBADF;
0754 return EXFAIL;
0755 }
0756
0757 emqinfo = emqd;
0758
0759 emqhdr = emqinfo->emqi_hdr;
0760 mptr = (char *) emqhdr;
0761 attr = &emqhdr->emqh_attr;
0762
0763 LOCK_Q;
0764
0765 if (maxlen < (size_t)attr->mq_msgsize)
0766 {
0767 errno = EMSGSIZE;
0768 goto err;
0769 }
0770
0771 if (attr->mq_curmsgs == 0)
0772 {
0773
0774 if (emqinfo->emqi_flags & O_NONBLOCK)
0775 {
0776 errno = EAGAIN;
0777 goto err;
0778 }
0779
0780
0781 emqhdr->emqh_nwait++;
0782 while (attr->mq_curmsgs == 0)
0783 {
0784 NDRX_LOG(log_dump, "queue empty on %p", emqd);
0785
0786 if (NULL==__abs_timeout)
0787 {
0788 if (EXSUCCEED!=(n=ndrx_pthread_cond_wait(&emqhdr->emqh_wait,
0789 &emqhdr->emqh_lock)))
0790 {
0791
0792 NDRX_LOG(log_error, "%s: pthread_cond_wait failed %d: %s",
0793 __func__, n, strerror(n));
0794 userlog("%s: pthread_cond_wait failed %d: %s",
0795 __func__, n, strerror(n));
0796 errno = n;
0797
0798 emqhdr->emqh_nwait--;
0799 goto err;
0800 }
0801 }
0802 else
0803 {
0804
0805 NDRX_LOG(log_dump, "timed wait...");
0806
0807
0808
0809
0810
0811 if (EXSUCCEED!=(n=ndrx_pthread_cond_timedwait(&emqhdr->emqh_wait,
0812 &emqhdr->emqh_lock, __abs_timeout)))
0813 {
0814
0815 if (n!=ETIMEDOUT)
0816 {
0817
0818 NDRX_LOG(log_error, "%s: ndrx_pthread_cond_timedwait failed %d: %s",
0819 __func__, n, strerror(n));
0820 userlog("%s: ndrx_pthread_cond_timedwait failed %d: %s",
0821 __func__, n, strerror(n));
0822 }
0823 else
0824 {
0825 NDRX_LOG(log_dump, "ETIMEDOUT: attr->mq_curmsgs = %ld", attr->mq_curmsgs);
0826 }
0827
0828 errno = n;
0829 emqhdr->emqh_nwait--;
0830 goto err;
0831 }
0832 }
0833 }
0834 emqhdr->emqh_nwait--;
0835 }
0836
0837 if ( (index = emqhdr->emqh_head) == 0)
0838 {
0839 NDRX_LOG(log_error, "emq_timedreceive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
0840 abort();
0841 }
0842
0843 msghdr = (struct emq_msg_hdr *) &mptr[index];
0844 emqhdr->emqh_head = msghdr->msg_next;
0845 len = msghdr->msg_len;
0846 memcpy(ptr, msghdr + 1, len);
0847
0848 if (priop != NULL)
0849 {
0850 *priop = msghdr->msg_prio;
0851 }
0852
0853
0854 msghdr->msg_next = emqhdr->emqh_free;
0855 emqhdr->emqh_free = index;
0856
0857
0858
0859
0860 pthread_cond_signal(&emqhdr->emqh_wait);
0861
0862 attr->mq_curmsgs--;
0863
0864 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0865
0866 NDRX_LOG(log_dump, "emq_timedreceive - got something len=%d stats: %ld wait: %ld",
0867 len, attr->mq_curmsgs, emqhdr->emqh_nwait);
0868 return(len);
0869
0870 err:
0871
0872 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
0873 n = errno;
0874 NDRX_LOG(log_dump, "emq_timedreceive - failed: %s stats: %ld wait: %ld",
0875 strerror(errno), attr->mq_curmsgs, emqhdr->emqh_nwait);
0876 errno = n;
0877
0878 return EXFAIL;
0879 }
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890 expublic int emq_timedsend(mqd_t emqd, const char *ptr, size_t len, unsigned int prio,
0891 const struct timespec *__abs_timeout)
0892 {
0893 int n;
0894 long index, freeindex;
0895 char *mptr;
0896 struct sigevent *sigev;
0897 struct emq_hdr *emqhdr;
0898 struct mq_attr *attr;
0899 struct emq_msg_hdr *msghdr, *nmsghdr, *pmsghdr;
0900 struct emq_info *emqinfo;
0901
0902 NDRX_LOG(log_dump, "into: emq_timedsend");
0903
0904 if (!qd_hash_chk((mqd_t) emqd))
0905 {
0906 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
0907 errno = EBADF;
0908 return EXFAIL;
0909 }
0910
0911 emqinfo = emqd;
0912 emqhdr = emqinfo->emqi_hdr;
0913 mptr = (char *) emqhdr;
0914 attr = &emqhdr->emqh_attr;
0915
0916 LOCK_Q;
0917
0918 if (len > (size_t)attr->mq_msgsize)
0919 {
0920 errno = EMSGSIZE;
0921 goto err;
0922 }
0923
0924 if (attr->mq_curmsgs >= attr->mq_maxmsg)
0925 {
0926
0927 if (emqinfo->emqi_flags & O_NONBLOCK)
0928 {
0929 errno = EAGAIN;
0930 goto err;
0931 }
0932
0933 while (attr->mq_curmsgs >= attr->mq_maxmsg)
0934 {
0935 NDRX_LOG(log_dump, "waiting on q %p", emqd);
0936
0937 if (NULL==__abs_timeout)
0938 {
0939 if (EXSUCCEED!=(n=ndrx_pthread_cond_wait(&emqhdr->emqh_wait,
0940 &emqhdr->emqh_lock)))
0941 {
0942 NDRX_LOG(log_error, "%s: pthread_cond_wait failed %d: %s",
0943 __func__, n, strerror(n));
0944 userlog("%s: pthread_cond_wait failed %d: %s",
0945 __func__, n, strerror(n));
0946
0947
0948 errno = n;
0949 return -1;
0950 }
0951 }
0952 else
0953 {
0954
0955 NDRX_LOG(log_dump, "timed wait...");
0956
0957 if (EXSUCCEED!=(n=ndrx_pthread_cond_timedwait(&emqhdr->emqh_wait,
0958 &emqhdr->emqh_lock, __abs_timeout)))
0959 {
0960 if (n!=ETIMEDOUT)
0961 {
0962 NDRX_LOG(log_error, "%s: ndrx_pthread_cond_timedwait failed %d: %s",
0963 __func__, n, strerror(n));
0964 userlog("%s: ndrx_pthread_cond_timedwait failed %d: %s",
0965 __func__, n, strerror(n));
0966 errno = n;
0967 return -1;
0968 }
0969 NDRX_LOG(log_error, "ETIMEDOUT: attr->mq_curmsgs = %ld", attr->mq_curmsgs);
0970
0971
0972 errno = n;
0973 goto err;
0974 }
0975 }
0976 NDRX_LOG(log_info, "%p - accessed ok", emqd);
0977 }
0978 }
0979
0980 if ( (freeindex = emqhdr->emqh_free) == 0)
0981 {
0982 userlog("emq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
0983 }
0984
0985 nmsghdr = (struct emq_msg_hdr *) &mptr[freeindex];
0986 nmsghdr->msg_prio = prio;
0987 nmsghdr->msg_len = len;
0988 memcpy(nmsghdr + 1, ptr, len);
0989 emqhdr->emqh_free = nmsghdr->msg_next;
0990
0991
0992 index = emqhdr->emqh_head;
0993 pmsghdr = (struct emq_msg_hdr *) &(emqhdr->emqh_head);
0994
0995 while (index != 0)
0996 {
0997 msghdr = (struct emq_msg_hdr *) &mptr[index];
0998
0999 if (prio > msghdr->msg_prio)
1000 {
1001 nmsghdr->msg_next = index;
1002 pmsghdr->msg_next = freeindex;
1003 break;
1004 }
1005 index = msghdr->msg_next;
1006 pmsghdr = msghdr;
1007 }
1008
1009 if (index == 0)
1010 {
1011 pmsghdr->msg_next = freeindex;
1012 nmsghdr->msg_next = 0;
1013 }
1014
1015
1016 if (attr->mq_curmsgs == 0)
1017 {
1018 if (emqhdr->emqh_pid != 0 && emqhdr->emqh_nwait == 0)
1019 {
1020 sigev = &emqhdr->emqh_event;
1021
1022 if (sigev->sigev_notify == SIGEV_SIGNAL)
1023 {
1024 kill(emqhdr->emqh_pid, sigev->sigev_signo);
1025 }
1026 emqhdr->emqh_pid = 0;
1027 }
1028 }
1029
1030
1031
1032
1033 pthread_cond_signal(&emqhdr->emqh_wait);
1034 attr->mq_curmsgs++;
1035
1036 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1037 NDRX_LOG(log_dump, "into: emq_timedsend - return 0 stats: %ld wait: %ld",
1038 attr->mq_curmsgs, emqhdr->emqh_nwait);
1039 return EXSUCCEED;
1040
1041 err:
1042 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1043
1044 n = errno;
1045 NDRX_LOG(log_dump, "into: emq_timedsend - return -1: %s stats: %ld wait: %ld",
1046 strerror(n), attr->mq_curmsgs, emqhdr->emqh_nwait);
1047 errno = n;
1048 return EXFAIL;
1049 }
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059 expublic int emq_send(mqd_t emqd, const char *ptr, size_t len, unsigned int prio)
1060 {
1061 return emq_timedsend(emqd, ptr, len, prio, NULL);
1062 }
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072 expublic ssize_t emq_receive(mqd_t emqd, char *ptr, size_t maxlen, unsigned int *priop)
1073 {
1074 return emq_timedreceive(emqd, ptr, maxlen, priop, NULL);
1075 }
1076
1077
1078
1079
1080
1081
1082
1083
1084 expublic int emq_setattr(mqd_t emqd, const struct mq_attr *emqstat, struct mq_attr *oemqstat)
1085 {
1086 int n;
1087 struct emq_hdr *emqhdr;
1088 struct mq_attr *attr;
1089 struct emq_info *emqinfo;
1090
1091 NDRX_LOG(log_dump, "into: emq_setattr");
1092
1093 if (!qd_hash_chk((mqd_t) emqd))
1094 {
1095 NDRX_LOG(log_error, "Invalid queue descriptor: %p", emqd);
1096 errno = EBADF;
1097 return EXFAIL;
1098 }
1099
1100 emqinfo = emqd;
1101 emqhdr = emqinfo->emqi_hdr;
1102 attr = &emqhdr->emqh_attr;
1103
1104 LOCK_Q;
1105
1106 if (oemqstat != NULL)
1107 {
1108 oemqstat->mq_flags = emqinfo->emqi_flags;
1109 oemqstat->mq_maxmsg = attr->mq_maxmsg;
1110 oemqstat->mq_msgsize = attr->mq_msgsize;
1111 oemqstat->mq_curmsgs = attr->mq_curmsgs;
1112 }
1113
1114 if (emqstat->mq_flags & O_NONBLOCK)
1115 {
1116 emqinfo->emqi_flags |= O_NONBLOCK;
1117 }
1118 else
1119 {
1120 emqinfo->emqi_flags &= ~O_NONBLOCK;
1121 }
1122
1123 MUTEX_UNLOCK_V(emqhdr->emqh_lock);
1124 NDRX_LOG(log_dump, "into: emq_setattr - return 0");
1125 return EXSUCCEED;
1126 }
1127
1128
1129
1130
1131
1132
1133 expublic int emq_unlink(const char *pathname)
1134 {
1135 char emq_x[PATH_MAX+1];
1136 NDRX_LOG(log_dump, "into: emq_unlink");
1137
1138 if (unlink(get_path(pathname, emq_x, sizeof(emq_x))) == -1)
1139 {
1140 NDRX_LOG(log_dump, "into: emq_unlink ret -1");
1141 return EXFAIL;
1142 }
1143 NDRX_LOG(log_dump, "into: emq_unlink ret 0");
1144 return EXSUCCEED;
1145 }
1146
1147
1148