0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040 #include <stdlib.h>
0041 #include <stdio.h>
0042 #include <fcntl.h> /* For O_* constants */
0043 #include <ndrstandard.h>
0044
0045 #include <sys/types.h>
0046 #include <sys/ipc.h>
0047 #include <sys/msg.h>
0048
0049 #include <nstopwatch.h>
0050 #include <nstd_tls.h>
0051 #include <exhash.h>
0052 #include <ndebug.h>
0053 #include <sys_svq.h>
0054 #include "nstd_shm.h"
0055 #include <sys_svq.h>
0056 #include <ndrx_config.h>
0057 #include <lcfint.h>
0058
0059
0060
0061
0062
0063
0064
0065 #define INIT_ENTRY(X) \
0066 if (!ndrx_G_svqshm_init) \
0067 {\
0068 MUTEX_LOCK_V(ndrx_G_svqshm_init_lock);\
0069 \
0070 if (!ndrx_G_svqshm_init)\
0071 {\
0072 X = ndrx_svqshm_init(EXFALSE);\
0073 }\
0074 \
0075 MUTEX_UNLOCK_V(ndrx_G_svqshm_init_lock);\
0076 \
0077 if (EXFAIL==X)\
0078 {\
0079 NDRX_LOG(log_error, "Failed to create/attach System V Queues "\
0080 "mapping shared memory blocks!");\
0081 EXFAIL_OUT(X);\
0082 }\
0083 }
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094 #define RM_TTL_DLFT 10
0095
0096 #define SHM_ENT_NONE 0
0097 #define SHM_ENT_MATCH 1
0098 #define SHM_ENT_OLD 2
0099
0100
0101
0102
0103
0104 expublic volatile int ndrx_G_svqshm_init = EXFALSE;
0105
0106 exprivate MUTEX_LOCKDECL(ndrx_G_svqshm_init_lock);
0107
0108 exprivate ndrx_shm_t M_map_p2s = {.fd=0, .path=""};
0109 exprivate ndrx_shm_t M_map_s2p = {.fd=0, .path=""};
0110 exprivate ndrx_sem_t M_map_sem = {.semid=0};
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121 expublic int ndrx_svqshm_down(int force)
0122 {
0123 int ret = EXSUCCEED;
0124 int i;
0125 ndrx_svq_map_t *svq, *el;
0126
0127
0128
0129 INIT_ENTRY(ret);
0130
0131 #ifdef EX_USE_SYSVQ
0132
0133
0134
0135
0136
0137 ndrx_svqadmin_fork_prepare();
0138 ndrx_svq_fork_prepare();
0139
0140 #if 0
0141 - currently nothing there...
0142
0143
0144
0145
0146 ndrx_atfork_child();
0147 #endif
0148 #endif
0149
0150 if (force)
0151 {
0152
0153 svq = (ndrx_svq_map_t *) M_map_p2s.mem;
0154
0155
0156 for (i=0; i<ndrx_G_libnstd_cfg.queuesmax; i++)
0157 {
0158 el = NDRX_SVQ_INDEX(svq, i);
0159
0160 if (el->flags & NDRX_SVQ_MAP_ISUSED)
0161 {
0162 NDRX_LOG(log_error, "DOWN: Removing QID %d (%s) - should not be present!",
0163 el->qid, el->qstr);
0164 userlog("DOWN: Removing QID %d (%s) - should not be present!",
0165 el->qid, el->qstr);
0166 if (EXSUCCEED!=msgctl(el->qid, IPC_RMID, NULL))
0167 {
0168 int err = errno;
0169 NDRX_LOG(log_error, "got error when removing %d: %s - ignore",
0170 el->qid, strerror(err));
0171 userlog("got error when removing %d: %s - ignore",
0172 el->qid, strerror(err));
0173 }
0174 }
0175 }
0176 }
0177
0178 if (EXSUCCEED!=ndrx_shm_close(&M_map_p2s))
0179 {
0180 ret = EXFAIL;
0181 }
0182
0183 if (EXSUCCEED!=ndrx_shm_remove(&M_map_p2s))
0184 {
0185 ret = EXFAIL;
0186 }
0187
0188 if (EXSUCCEED!=ndrx_shm_close(&M_map_s2p))
0189 {
0190 ret = EXFAIL;
0191 }
0192
0193 if (EXSUCCEED!=ndrx_shm_remove(&M_map_s2p))
0194 {
0195 ret = EXFAIL;
0196 }
0197
0198 if (EXSUCCEED!=ndrx_sem_close(&M_map_sem))
0199 {
0200 ret = EXFAIL;
0201 }
0202
0203 if (EXSUCCEED!=ndrx_sem_remove(&M_map_sem, EXTRUE))
0204 {
0205 ret = EXFAIL;
0206 }
0207
0208 ndrx_G_svqshm_init = EXFALSE;
0209
0210 out:
0211 return ret;
0212 }
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222 expublic int ndrx_svqshm_init(int attach_only)
0223 {
0224 int ret = EXSUCCEED;
0225
0226
0227 NDRX_LOG(log_debug, "SystemV queue init...");
0228
0229
0230 memset(&M_map_p2s, 0, sizeof(M_map_p2s));
0231 memset(&M_map_s2p, 0, sizeof(M_map_s2p));
0232
0233
0234
0235
0236 if (NULL==ndrx_G_libnstd_cfg.qprefix || EXEOS==ndrx_G_libnstd_cfg.qprefix[0])
0237 {
0238 NDRX_LOG(log_error, "%s is not set!", CONF_NDRX_QPREFIX);
0239 EXFAIL_OUT(ret);
0240 }
0241
0242 if (!ndrx_G_libnstd_cfg.ipckey)
0243 {
0244 NDRX_LOG(log_error, "%s is not set!", CONF_NDRX_IPCKEY);
0245 EXFAIL_OUT(ret);
0246 }
0247
0248
0249
0250 M_map_p2s.fd = EXFAIL;
0251 M_map_p2s.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SHM_P2S_KEYOFSZ;
0252
0253 snprintf(M_map_p2s.path, sizeof(M_map_p2s.path), NDRX_SHM_P2S, ndrx_G_libnstd_cfg.qprefix);
0254 M_map_p2s.size = sizeof(ndrx_svq_map_t)*ndrx_G_libnstd_cfg.queuesmax;
0255
0256 M_map_s2p.fd = EXFAIL;
0257 M_map_s2p.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SHM_S2P_KEYOFSZ;
0258
0259 snprintf(M_map_s2p.path, sizeof(M_map_s2p.path), NDRX_SHM_S2P, ndrx_G_libnstd_cfg.qprefix);
0260 M_map_s2p.size = sizeof(ndrx_svq_map_t)*ndrx_G_libnstd_cfg.queuesmax;
0261
0262
0263 if (attach_only)
0264 {
0265 if (EXSUCCEED!=ndrx_shm_attach(&M_map_p2s))
0266 {
0267 NDRX_LOG(log_error, "Failed to oattach shm [%s] - System V Queues cannot work",
0268 M_map_p2s.path);
0269 EXFAIL_OUT(ret);
0270 }
0271 }
0272 else if (EXSUCCEED!=ndrx_shm_open(&M_map_p2s, EXTRUE))
0273 {
0274 NDRX_LOG(log_error, "Failed to open shm [%s] - System V Queues cannot work",
0275 M_map_p2s.path);
0276 EXFAIL_OUT(ret);
0277 }
0278
0279 if (attach_only)
0280 {
0281 if (EXSUCCEED!=ndrx_shm_attach(&M_map_s2p))
0282 {
0283 NDRX_LOG(log_error, "Failed to attach shm [%s] - System V Queues cannot work",
0284 M_map_s2p.path);
0285 EXFAIL_OUT(ret);
0286 }
0287 }
0288 else if (EXSUCCEED!=ndrx_shm_open(&M_map_s2p, EXTRUE))
0289 {
0290 NDRX_LOG(log_error, "Failed to open shm [%s] - System V Queues cannot work",
0291 M_map_s2p.path);
0292 EXFAIL_OUT(ret);
0293 }
0294
0295 NDRX_LOG(log_debug, "M_map_p2s.mem=%p M_map_s2p=%p",
0296 M_map_p2s.mem, M_map_s2p.mem);
0297
0298 memset(&M_map_sem, 0, sizeof(M_map_sem));
0299
0300
0301 M_map_sem.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SEM_SV5LOCKS;
0302
0303
0304
0305
0306
0307
0308
0309
0310
0311 M_map_sem.nrsems = 1;
0312 M_map_sem.maxreaders = ndrx_G_libnstd_cfg.svqreadersmax;
0313
0314 NDRX_LOG(log_debug, "Using service semaphore key: %d max readers: %d",
0315 M_map_sem.key, ndrx_G_libnstd_cfg.svqreadersmax);
0316
0317
0318 if (attach_only)
0319 {
0320 if (EXSUCCEED!=ndrx_sem_attach(&M_map_sem))
0321 {
0322 NDRX_LOG(log_error, "Failed to attach semaphore for System V queue "
0323 "map shared mem");
0324 EXFAIL_OUT(ret);
0325 }
0326 }
0327 else if (EXSUCCEED!=ndrx_sem_open(&M_map_sem, EXTRUE))
0328 {
0329 NDRX_LOG(log_error, "Failed to open semaphore for System V queue "
0330 "map shared mem");
0331 userlog("Failed to open semaphore for System V queue "
0332 "map shared mem");
0333 EXFAIL_OUT(ret);
0334 }
0335
0336 #ifdef EX_USE_SYSVQ
0337
0338 if (!attach_only && EXSUCCEED!=ndrx_svq_event_init())
0339 {
0340 NDRX_LOG(log_error, "Failed to init System V queue monitoring thread!");
0341 userlog("Failed to init System V queue monitoring thread!");
0342 EXFAIL_OUT(ret);
0343 }
0344 #endif
0345
0346 ndrx_G_svqshm_init = EXTRUE;
0347 out:
0348 return ret;
0349 }
0350
0351
0352
0353
0354
0355
0356
0357
0358 expublic int ndrx_svqshm_attach(void)
0359 {
0360 int ret = EXSUCCEED;
0361
0362 if (ndrx_G_svqshm_init)
0363 {
0364 NDRX_LOG(log_debug, "Already System V resources open");
0365 goto out;
0366 }
0367
0368 if (EXSUCCEED!=ndrx_svqshm_init(EXTRUE))
0369 {
0370 NDRX_LOG(log_error, "Failed to attach to System V resources");
0371 EXFAIL_OUT(ret);
0372 }
0373
0374 NDRX_LOG(log_debug, "Attached to shm/sem OK");
0375
0376 ret = EXTRUE;
0377
0378 out:
0379 return ret;
0380 }
0381
0382
0383
0384
0385 expublic void ndrx_svqshm_detach(void)
0386 {
0387 ndrx_shm_close(&M_map_p2s);
0388 ndrx_shm_close(&M_map_s2p);
0389 ndrx_sem_close(&M_map_sem);
0390
0391 ndrx_G_svqshm_init = EXFALSE;
0392 }
0393
0394
0395
0396
0397
0398
0399
0400
0401 expublic int ndrx_svqshm_shmres_get(ndrx_shm_t **map_p2s, ndrx_shm_t **map_s2p,
0402 ndrx_sem_t **map_sem, int *queuesmax)
0403 {
0404
0405 *map_p2s = &M_map_p2s;
0406 *map_s2p = &M_map_s2p;
0407 *map_sem = &M_map_sem;
0408 *queuesmax = ndrx_G_libnstd_cfg.queuesmax;
0409
0410 return ndrx_G_svqshm_init;
0411 }
0412
0413
0414
0415
0416
0417
0418
0419
0420 exprivate int qstr_key_hash(ndrx_lh_config_t *conf, void *key_get, size_t key_len)
0421 {
0422 return ndrx_hash_fn(key_get) % conf->elmmax;
0423 }
0424
0425
0426
0427
0428
0429
0430
0431
0432
0433 exprivate void qstr_key_debug(ndrx_lh_config_t *conf, void *key_get, size_t key_len,
0434 char *dbg_out, size_t dbg_len)
0435 {
0436 NDRX_STRCPY_SAFE_DST(dbg_out, key_get, dbg_len);
0437 }
0438
0439
0440
0441
0442
0443
0444
0445
0446 exprivate void val_debug(ndrx_lh_config_t *conf, int idx, char *dbg_out, size_t dbg_len)
0447 {
0448 snprintf(dbg_out, dbg_len, "%s/%d",
0449 NDRX_SVQ_INDEX((*conf->memptr), idx)->qstr,
0450 NDRX_SVQ_INDEX((*conf->memptr), idx)->qid);
0451
0452 }
0453
0454
0455
0456
0457
0458
0459
0460
0461
0462 exprivate int qstr_compare(ndrx_lh_config_t *conf, void *key_get, size_t key_len, int idx)
0463 {
0464 return strcmp(NDRX_SVQ_INDEX((*conf->memptr), idx)->qstr, key_get);
0465 }
0466
0467
0468
0469
0470
0471
0472
0473
0474
0475 exprivate int position_get_qstr(char *pathname, int oflag, int *pos,
0476 int *have_value)
0477 {
0478 static ndrx_lh_config_t conf;
0479 static int first = EXTRUE;
0480
0481 if (first)
0482 {
0483 conf.elmmax = ndrx_G_libnstd_cfg.queuesmax;
0484 conf.elmsz = sizeof(ndrx_svq_map_t);
0485 conf.flags_offset = EXOFFSET(ndrx_svq_map_t, flags);
0486 conf.memptr = (void **)&(M_map_p2s.mem);
0487 conf.p_key_hash=&qstr_key_hash;
0488 conf.p_key_debug=&qstr_key_debug;
0489 conf.p_val_debug=&val_debug;
0490 conf.p_compare=&qstr_compare;
0491 first = EXFALSE;
0492 }
0493
0494 return ndrx_lh_position_get(&conf, pathname, 0, oflag, pos, have_value, "qstr");
0495 }
0496
0497
0498
0499
0500
0501
0502
0503
0504 exprivate int qid_key_hash(ndrx_lh_config_t *conf, void *key_get, size_t key_len)
0505 {
0506 return *((int *)key_get) % conf->elmmax;
0507 }
0508
0509
0510
0511
0512
0513
0514
0515
0516
0517 exprivate void qid_key_debug(ndrx_lh_config_t *conf, void *key_get, size_t key_len,
0518 char *dbg_out, size_t dbg_len)
0519 {
0520 snprintf(dbg_out, dbg_len, "%d", *((int *)key_get));
0521 }
0522
0523
0524
0525
0526
0527
0528
0529
0530
0531 exprivate int qid_compare(ndrx_lh_config_t *conf, void *key_get, size_t key_len, int idx)
0532 {
0533
0534 if (NDRX_SVQ_INDEX((*conf->memptr), idx)->qid == *((int *)key_get))
0535 {
0536 return EXSUCCEED;
0537 }
0538 else
0539 {
0540 return EXFAIL;
0541 }
0542 }
0543
0544
0545
0546
0547
0548
0549
0550
0551 exprivate int position_get_qid(int qid, int oflag, int *pos,
0552 int *have_value)
0553 {
0554 static ndrx_lh_config_t conf;
0555 static int first = EXTRUE;
0556
0557 if (first)
0558 {
0559 conf.elmmax = ndrx_G_libnstd_cfg.queuesmax;
0560 conf.elmsz = sizeof(ndrx_svq_map_t);
0561 conf.flags_offset = EXOFFSET(ndrx_svq_map_t, flags);
0562 conf.memptr = (void **)&(M_map_s2p.mem);
0563 conf.p_key_hash=&qid_key_hash;
0564 conf.p_key_debug=&qid_key_debug;
0565 conf.p_val_debug=&val_debug;
0566 conf.p_compare=&qid_compare;
0567 first = EXFALSE;
0568 }
0569
0570 return ndrx_lh_position_get(&conf, &qid, 0, oflag, pos, have_value, "qid");
0571
0572 }
0573
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583 expublic int ndrx_svqshm_get_status(ndrx_svq_status_t *status,
0584 int qid, int *pos, int *have_value)
0585 {
0586 int ret=SHM_ENT_NONE;
0587 int try = qid % ndrx_G_libnstd_cfg.queuesmax;
0588 int start = try;
0589 int overflow = EXFALSE;
0590 int iterations = 0;
0591 ndrx_svq_status_t *el;
0592
0593 *pos=EXFAIL;
0594 *have_value = EXFALSE;
0595
0596
0597
0598
0599
0600
0601
0602 while ((NDRX_SVQ_STATIDX(status, try)->flags & NDRX_SVQ_MAP_WASUSED)
0603 && (!overflow || (overflow && try < start)))
0604 {
0605
0606 el = NDRX_SVQ_STATIDX(status, try);
0607
0608 if (el->qid == qid)
0609 {
0610 *pos=try;
0611
0612 if (el->flags & NDRX_SVQ_MAP_ISUSED)
0613 {
0614 ret=SHM_ENT_MATCH;
0615 }
0616 else
0617 {
0618 ret=SHM_ENT_OLD;
0619 }
0620
0621
0622
0623 break;
0624 }
0625
0626 try++;
0627
0628
0629
0630
0631
0632 if (try>=ndrx_G_libnstd_cfg.queuesmax)
0633 {
0634 try = 0;
0635 overflow=EXTRUE;
0636 NDRX_LOG(log_debug, "Overflow reached for search of [%d]", qid);
0637 }
0638 iterations++;
0639
0640 NDRX_LOG(log_dump, "Trying %d for [%d]", try, qid);
0641 }
0642 switch (ret)
0643 {
0644 case SHM_ENT_OLD:
0645 *have_value = EXFALSE;
0646 ret = EXTRUE;
0647 break;
0648 case SHM_ENT_NONE:
0649
0650 if (overflow)
0651 {
0652 *have_value = EXFALSE;
0653 ret = EXFALSE;
0654 }
0655 else
0656 {
0657 *have_value = EXFALSE;
0658 ret = EXTRUE;
0659 }
0660
0661 break;
0662 case SHM_ENT_MATCH:
0663 *have_value = EXTRUE;
0664 ret = EXTRUE;
0665 break;
0666 default:
0667
0668 NDRX_LOG(log_error, "!!! should not get here...");
0669 break;
0670 }
0671
0672 *pos=try;
0673 NDRX_LOG(log_debug, "[%d] - result: %d, "
0674 "iterations: %d, pos: %d, have_value: %d",
0675 qid, ret, iterations, *pos, *have_value);
0676 return ret;
0677 }
0678
0679
0680
0681
0682
0683
0684
0685
0686 expublic int ndrx_svqshm_get_qid(int in_qid, char *out_qstr, int out_qstr_len)
0687 {
0688 int ret = EXSUCCEED;
0689 int found_2;
0690 int have_value_2;
0691 int pos_2;
0692 ndrx_svq_map_t *svq2;
0693 ndrx_svq_map_t *sm;
0694
0695 INIT_ENTRY(ret);
0696
0697 svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
0698
0699
0700 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
0701 {
0702 goto out;
0703 }
0704
0705 found_2 = position_get_qid(in_qid, 0, &pos_2, &have_value_2);
0706
0707 if (have_value_2)
0708 {
0709 sm = NDRX_SVQ_INDEX(svq2, pos_2);
0710 NDRX_STRCPY_SAFE_DST(out_qstr, sm->qstr, out_qstr_len);
0711 }
0712 else
0713 {
0714 ret=EXFAIL;
0715 }
0716
0717 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
0718
0719
0720 out:
0721
0722 return ret;
0723 }
0724
0725
0726
0727
0728
0729
0730
0731
0732
0733
0734 expublic int ndrx_svqshm_get(char *qstr, mode_t mode, int oflag)
0735 {
0736 int ret = EXSUCCEED;
0737 int qid;
0738 int found;
0739 int have_value;
0740 int pos;
0741
0742 int found_2;
0743 int have_value_2;
0744 int pos_2;
0745
0746 int msgflag;
0747 int err = 0;
0748
0749 ndrx_svq_map_t *svq;
0750 ndrx_svq_map_t *svq2;
0751
0752 ndrx_svq_map_t *pm;
0753 ndrx_svq_map_t *sm;
0754
0755 INIT_ENTRY(ret);
0756
0757 svq = (ndrx_svq_map_t *) M_map_p2s.mem;
0758 svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
0759
0760
0761
0762
0763
0764
0765
0766
0767 if (!(oflag & O_CREAT))
0768 {
0769
0770 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
0771 {
0772 goto out;
0773 }
0774
0775 found = position_get_qstr(qstr, oflag, &pos, &have_value);
0776
0777 if (have_value)
0778 {
0779 pm = NDRX_SVQ_INDEX(svq, pos);
0780 qid = pm->qid;
0781 }
0782
0783 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
0784
0785
0786 if (have_value)
0787 {
0788 if (oflag & (O_CREAT | O_EXCL))
0789 {
0790 NDRX_LOG(log_error, "Queue [%s] was requested with O_CREAT | O_EXCL, but "
0791 "it already exists at position with qid %d", qstr, ret);
0792 err = EEXIST;
0793 EXFAIL_OUT(ret);
0794 }
0795 else
0796 {
0797 NDRX_LOG(log_debug, "Queue [%s] mapped to qid %d", qstr, qid);
0798 }
0799
0800
0801 goto out;
0802 }
0803 else
0804 {
0805
0806 NDRX_LOG(log_debug, "Queue not found for: [%s]", qstr);
0807 err = ENOENT;
0808 EXFAIL_OUT(ret);
0809 }
0810 }
0811
0812
0813 NDRX_LOG(log_info, "[%s] queue not registered or write requested %d - Writ try...",
0814 qstr, oflag);
0815
0816
0817 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE))
0818 {
0819 goto out;
0820 }
0821
0822 found = position_get_qstr(qstr, oflag, &pos, &have_value);
0823
0824
0825
0826 if (!found)
0827 {
0828 NDRX_LOG(log_error, "Location not found for [%s] - memory full?", qstr);
0829 userlog("Location not found for [%s] - memory full?", qstr);
0830 err = ENOMEM;
0831 EXFAIL_OUT(ret);
0832 }
0833
0834 pm = NDRX_SVQ_INDEX(svq, pos);
0835
0836
0837
0838 if (have_value)
0839 {
0840 qid = pm->qid;
0841
0842 if ( (oflag & O_CREAT) && (oflag & O_EXCL))
0843 {
0844
0845 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0846 NDRX_LOG(log_error, "Queue [%s] was requested with O_CREAT | O_EXCL, but "
0847 "it already exists at position with qid %d", qstr, qid);
0848 err = EEXIST;
0849 EXFAIL_OUT(ret);
0850 }
0851 else
0852 {
0853
0854
0855
0856
0857
0858 qid = pm->qid;
0859
0860 found_2 = position_get_qid(qid, oflag, &pos_2, &have_value_2);
0861 sm = NDRX_SVQ_INDEX(svq2, pos_2);
0862
0863 ndrx_stopwatch_reset(&(pm->ctime));
0864 sm->ctime = pm->ctime;
0865
0866 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0867
0868
0869 NDRX_LOG(log_info, "Queue [%s] mapped to qid %d",
0870 qstr, qid);
0871 }
0872
0873 goto out;
0874 }
0875
0876
0877
0878 msgflag=0;
0879
0880 if (oflag & O_CREAT)
0881 {
0882 msgflag|=IPC_CREAT;
0883 }
0884
0885 if (oflag & O_EXCL)
0886 {
0887 msgflag|=IPC_EXCL;
0888 }
0889
0890
0891 if (EXFAIL==(qid = msgget(IPC_PRIVATE, msgflag|mode)))
0892 {
0893 int err = errno;
0894 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0895
0896
0897 NDRX_LOG(log_error, "Failed msgget: %s for [%s]", strerror(err), qstr);
0898 userlog("Failed msgget: %s for [%s]", strerror(err), qstr);
0899 EXFAIL_OUT(ret);
0900 }
0901
0902
0903 pm->qid = qid;
0904 NDRX_STRCPY_SAFE(pm->qstr, qstr);
0905 pm->flags = (NDRX_SVQ_MAP_ISUSED | NDRX_SVQ_MAP_WASUSED);
0906
0907
0908
0909
0910
0911
0912
0913 found_2 = position_get_qid(qid, oflag, &pos_2, &have_value_2);
0914
0915 if (!found_2)
0916 {
0917 NDRX_LOG(log_error, "Location not found for qid [%d] - memory full?", qid);
0918 userlog("Location not found for qid [%d] - memory full?", qid);
0919 err = ENOMEM;
0920 EXFAIL_OUT(ret);
0921 }
0922
0923 sm = NDRX_SVQ_INDEX(svq2, pos_2);
0924
0925 sm->qid = qid;
0926 NDRX_STRCPY_SAFE( sm->qstr, qstr);
0927 sm->flags = (NDRX_SVQ_MAP_ISUSED | NDRX_SVQ_MAP_WASUSED);
0928
0929
0930 ndrx_stopwatch_reset(&(pm->ctime));
0931 sm->ctime = pm->ctime;
0932
0933 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0934
0935
0936 NDRX_LOG(log_debug, "Open queue: [%s] to system v: [%d]", qstr, qid);
0937
0938 out:
0939
0940 if (EXSUCCEED!=ret)
0941 {
0942 errno = err;
0943 return EXFAIL;
0944 }
0945 else
0946 {
0947 return qid;
0948 }
0949 }
0950
0951
0952
0953
0954
0955
0956
0957
0958
0959
0960
0961
0962
0963
0964
0965
0966
0967
0968
0969
0970
0971
0972
0973
0974
0975
0976
0977
0978
0979 expublic int ndrx_svqshm_ctl(char *qstr, int qid, int cmd, int arg1,
0980 int (*p_deletecb)(int qid, char *qstr))
0981 {
0982 int ret = EXSUCCEED;
0983
0984 int delta;
0985
0986 int found;
0987 int have_value;
0988 int pos;
0989
0990 int found_2;
0991 int have_value_2;
0992 int pos_2;
0993
0994 int err = 0;
0995 int is_locked = EXFALSE;
0996
0997 ndrx_svq_map_t *svq;
0998 ndrx_svq_map_t *svq2;
0999
1000
1001 ndrx_svq_map_t *pm;
1002 ndrx_svq_map_t *sm;
1003
1004 INIT_ENTRY(ret);
1005
1006 svq = (ndrx_svq_map_t *) M_map_p2s.mem;
1007 svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
1008
1009
1010 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE))
1011 {
1012 goto out;
1013 }
1014
1015 is_locked = EXTRUE;
1016
1017 if (qstr)
1018 {
1019 found = position_get_qstr(qstr, 0, &pos, &have_value);
1020
1021 if (have_value)
1022 {
1023 pm = NDRX_SVQ_INDEX(svq, pos);
1024
1025 found_2 = position_get_qid(pm->qid, 0,
1026 &pos_2, &have_value_2);
1027
1028 if (!have_value_2)
1029 {
1030 NDRX_LOG(log_error, "qstr [%s] map not in sync with qid %d "
1031 "map (have_value_2 is false)",
1032 qstr, pm->qid);
1033 userlog("qstr [%s] map not in sync with qid %d "
1034 "map (have_value_2 is false)",
1035 qstr, pm->qid);
1036 err = EBADF;
1037 EXFAIL_OUT(ret);
1038 }
1039
1040 sm = NDRX_SVQ_INDEX(svq2, pos_2);
1041 }
1042
1043 }
1044 else if (EXFAIL!=qid)
1045 {
1046 found_2 = position_get_qid(qid, 0,
1047 &pos_2, &have_value_2);
1048
1049 if (have_value_2)
1050 {
1051 sm = NDRX_SVQ_INDEX(svq2, pos_2);
1052
1053 found = position_get_qstr(sm->qstr, 0, &pos, &have_value);
1054
1055 if (!have_value)
1056 {
1057 NDRX_LOG(log_error, "qstr [%s] map not in sync with qid %d "
1058 "map (have_value is false)",
1059 sm->qstr, qstr, qid);
1060
1061 userlog("qstr [%s] map not in sync with qid %d "
1062 "map (have_value is false)",
1063 sm->qstr, qstr, qid);
1064 err = EBADF;
1065 EXFAIL_OUT(ret);
1066 }
1067
1068 pm = NDRX_SVQ_INDEX(svq, pos);
1069
1070 }
1071 }
1072 else
1073 {
1074 NDRX_LOG(log_error, "qstr and qid are invalid => FAIL");
1075 err = EINVAL;
1076 EXFAIL_OUT(ret);
1077 }
1078
1079 switch (cmd)
1080 {
1081
1082 case IPC_SET:
1083
1084 NDRX_LOG(log_debug, "Adding flags %hd to qstr [%s]/qid %d",
1085 (short)arg1, pm->qstr, pm->qid);
1086 pm->flags |= (short)arg1;
1087 sm->flags |= (short)arg1;
1088
1089 if (NDRX_SVQ_MAP_RQADDR & arg1)
1090 {
1091
1092
1093
1094
1095
1096 ndrx_stopwatch_reset(&(pm->ctime));
1097 sm->ctime = pm->ctime;
1098 }
1099
1100 break;
1101 case IPC_RMID:
1102
1103 if (!have_value)
1104 {
1105 NDRX_LOG(log_info, "Queue not found [%s]/%d",
1106 qstr?qstr:"NULL", qid);
1107 err = ENOENT;
1108 EXFAIL_OUT(ret);
1109 }
1110
1111 if (EXFAIL!=arg1)
1112 {
1113 delta = ndrx_stopwatch_get_delta_sec( &(pm->ctime));
1114 }
1115
1116 if ( EXFAIL==arg1 || delta > arg1)
1117 {
1118 NDRX_LOG(log_info, "Unlinking queue: [%s]/%d (delta: %d, limit: %d)",
1119 pm->qstr, pm->qid, delta, arg1);
1120
1121
1122
1123 if (NULL!=p_deletecb && EXSUCCEED!=p_deletecb(pm->qid, pm->qstr))
1124 {
1125 NDRX_LOG(log_error, "Delete callback failed for [%s]/%d",
1126 pm->qstr, pm->qid);
1127 EXFAIL_OUT(ret);
1128 }
1129
1130 if (EXSUCCEED!=msgctl(pm->qid, IPC_RMID, NULL))
1131 {
1132 err = errno;
1133
1134 if (EIDRM!=err && EINVAL!=err)
1135 {
1136 NDRX_LOG(log_error, "got error when removing %d: %s",
1137 pm->qid, strerror(err));
1138 userlog("got error when removing %d: %s",
1139 pm->qid, strerror(err));
1140 EXFAIL_OUT(ret);
1141 }
1142 else
1143 {
1144 NDRX_LOG(log_warn, "got error when removing %d: %s - ignore",
1145 pm->qid, strerror(err));
1146 }
1147 }
1148
1149 NDRX_LOG(log_debug, "Removing ISUSED flag for P2S/S2P mem");
1150
1151 pm->flags &= ~(NDRX_SVQ_MAP_ISUSED);
1152 sm->flags &= ~(NDRX_SVQ_MAP_ISUSED);
1153 }
1154
1155 break;
1156
1157 default:
1158
1159 NDRX_LOG(log_error, "Unsupported command: %d", cmd);
1160 err=EINVAL;
1161 EXFAIL_OUT(ret);
1162 break;
1163 }
1164
1165
1166 out:
1167
1168 if (is_locked)
1169 {
1170 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
1171
1172 }
1173
1174 errno = err;
1175 return ret;
1176 }
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187 expublic string_list_t* ndrx_sys_mqueue_list_make_svq(char *qpath, int *return_status)
1188 {
1189 int retstat = EXSUCCEED;
1190 string_list_t* ret = NULL;
1191 int have_lock = EXFALSE;
1192 int i=0;
1193 ndrx_svq_map_t *svq;
1194 ndrx_svq_map_t *pm;
1195
1196 INIT_ENTRY(retstat);
1197
1198 svq = (ndrx_svq_map_t *) M_map_p2s.mem;
1199
1200 *return_status = EXSUCCEED;
1201
1202
1203 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
1204 {
1205 goto out;
1206 }
1207
1208 have_lock = EXTRUE;
1209
1210 for (i=0;i<ndrx_G_libnstd_cfg.queuesmax;i++)
1211 {
1212 pm = NDRX_SVQ_INDEX(svq, i);
1213
1214 if (pm->flags & NDRX_SVQ_MAP_ISUSED)
1215 {
1216 if (EXSUCCEED!=ndrx_string_list_add(&ret, pm->qstr))
1217 {
1218 NDRX_LOG(log_error, "failed to add string to list [%s]",
1219 pm->qstr);
1220 *return_status = EXFAIL;
1221 goto out;
1222 }
1223 }
1224 }
1225
1226 out:
1227 if (have_lock)
1228 {
1229 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
1230
1231 }
1232
1233 return ret;
1234 }
1235
1236
1237
1238
1239
1240
1241
1242 expublic ndrx_svq_status_t* ndrx_svqshm_statusget(int *len, int ttl)
1243 {
1244 int ret = EXSUCCEED;
1245 ndrx_svq_status_t* block = NULL;
1246 int sz = sizeof(ndrx_svq_status_t) * ndrx_G_libnstd_cfg.queuesmax;
1247 int err;
1248 int have_lock = EXFALSE;
1249 int i=0;
1250 ndrx_svq_map_t *svq = (ndrx_svq_map_t *) M_map_s2p.mem;
1251 ndrx_svq_map_t *pm;
1252
1253 block = NDRX_MALLOC(sz);
1254
1255 if (NULL==block)
1256 {
1257 err = errno;
1258 NDRX_LOG(log_error, "Failed to malloc %d bytes: %s", sz, strerror(err));
1259 userlog("Failed to malloc %d bytes: %s", sz, strerror(err));
1260 EXFAIL_OUT(ret);
1261 }
1262
1263 *len = ndrx_G_libnstd_cfg.queuesmax;
1264
1265
1266 if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
1267 {
1268 EXFAIL_OUT(ret);
1269 }
1270
1271 have_lock = EXTRUE;
1272
1273 for (i=0;i<ndrx_G_libnstd_cfg.queuesmax;i++)
1274 {
1275 pm = NDRX_SVQ_INDEX(svq, i);
1276 block[i].flags = pm->flags;
1277 block[i].qid = pm->qid;
1278
1279 if (block[i].flags & NDRX_SVQ_MAP_ISUSED &&
1280 ndrx_stopwatch_get_delta_sec( &(pm->ctime)) > ttl)
1281 {
1282 block[i].flags |= NDRX_SVQ_MAP_SCHEDRM;
1283 }
1284
1285 NDRX_STRCPY_SAFE(block[i].qstr, pm->qstr);
1286 }
1287
1288 out:
1289
1290 if (have_lock)
1291 {
1292 ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
1293
1294 }
1295
1296 return block;
1297 }
1298
1299