0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 #include <ndrx_config.h>
0039
0040 #ifdef EX_OS_FREEBSD
0041 #define _WITH_GETLINE
0042 #endif
0043
0044 #include <stdio.h>
0045 #include <stdlib.h>
0046 #include <string.h>
0047 #include <ctype.h>
0048 #include <errno.h>
0049 #include <regex.h>
0050 #include <utlist.h>
0051
0052 #include <ndebug.h>
0053 #include <atmi.h>
0054 #include <atmi_int.h>
0055 #include <typed_buf.h>
0056 #include <ndrstandard.h>
0057 #include <ubf.h>
0058 #include <Exfields.h>
0059
0060 #include <exnet.h>
0061 #include <ndrxdcmn.h>
0062
0063 #include "tmqd.h"
0064 #include "../libatmisrv/srv_int.h"
0065 #include "tperror.h"
0066 #include "userlog.h"
0067 #include <xa_cmn.h>
0068 #include <exthpool.h>
0069 #include "nstdutil.h"
0070 #include "tmqueue.h"
0071 #include "cconfig.h"
0072 #include <rbtree.h>
0073 #include "qtran.h"
0074
0075
0076 #define MAX_TOKEN_SIZE 64
0077 #define WORKERS_DEFAULT 1
0078
0079 #define TMQ_QC_NAME "name"
0080 #define TMQ_QC_SVCNM "svcnm"
0081 #define TMQ_QC_TRIES "tries"
0082 #define TMQ_QC_AUTOQ "autoq"
0083 #define TMQ_QC_WAITINIT "waitinit"
0084 #define TMQ_QC_WAITRETRY "waitretry"
0085 #define TMQ_QC_WAITRETRYINC "waitretryinc"
0086 #define TMQ_QC_WAITRETRYMAX "waitretrymax"
0087 #define TMQ_QC_MEMONLY "memonly"
0088 #define TMQ_QC_MODE "mode"
0089 #define TMQ_QC_TXTOUT "txtout"
0090 #define TMQ_QC_ERRORQ "errorq"
0091 #define TMQ_QC_WORKERS "workers"
0092 #define TMQ_QC_SYNC "sync"
0093
0094 #define EXHASH_FIND_STR_H2(head,findstr,out) \
0095 EXHASH_FIND(h2,head,findstr,strlen(findstr),out)
0096
0097 #define EXHASH_ADD_STR_H2(head,strfield,add) \
0098 EXHASH_ADD(h2,head,strfield,strlen(add->strfield),add)
0099
0100 #define EXHASH_DEL_H2(head,delptr) \
0101 EXHASH_DELETE(h2,head,delptr)
0102
0103
0104
0105
0106 #define TMQ_COR_GETMSG(ptr) ((tmq_memmsg_t *)((char *)ptr - EXOFFSET(tmq_memmsg_t, cor)))
0107
0108
0109
0110
0111
0112
0113
0114 expublic tmq_memmsg_t *G_msgid_hash = NULL;
0115
0116
0117 expublic tmq_qhash_t *G_qhash = NULL;
0118
0119
0120
0121
0122 exprivate MUTEX_LOCKDECL(M_q_lock);
0123
0124
0125 expublic tmq_qconfig_t *G_qconf = NULL;
0126
0127
0128 exprivate tmq_memmsg_t* tmq_get_msg_by_msgid_str(char *msgid_str);
0129
0130
0131
0132
0133
0134
0135 expublic int tmq_msgid_exists(char *msgid_str)
0136 {
0137 tmq_memmsg_t *ret;
0138
0139 MUTEX_LOCK_V(M_q_lock);
0140 EXHASH_FIND_STR( G_msgid_hash, msgid_str, ret);
0141 MUTEX_UNLOCK_V(M_q_lock);
0142
0143 return (NULL!=ret)?EXTRUE:EXFALSE;
0144 }
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155 exprivate int process_block(char *tmxid, union tmq_block **p_block, int state, int seqno)
0156 {
0157 int ret = EXSUCCEED;
0158
0159
0160
0161
0162
0163 if (TMQ_TXSTATE_ACTIVE==state || TMQ_TXSTATE_PREPARED==state)
0164 {
0165 char entry_status;
0166
0167 if (TMQ_TXSTATE_PREPARED==state)
0168 {
0169 entry_status = XA_RM_STATUS_PREP;
0170 }
0171 else
0172 {
0173 entry_status = XA_RM_STATUS_ACTIVE;
0174 }
0175
0176 if (EXSUCCEED!=tmq_log_addcmd(tmxid, seqno, (char *)*p_block, entry_status))
0177 {
0178 NDRX_LOG(log_error, "Failed to add tmxid [%s] seqno %d",
0179 tmxid, seqno);
0180 EXFAIL_OUT(ret);
0181 }
0182 }
0183
0184 switch((*p_block)->hdr.command_code)
0185 {
0186 case TMQ_STORCMD_NEWMSG:
0187
0188 if (EXSUCCEED!=tmq_msg_add((tmq_msg_t **)p_block, EXTRUE, NULL, NULL))
0189 {
0190 NDRX_LOG(log_error, "Failed to enqueue!");
0191 EXFAIL_OUT(ret);
0192 }
0193
0194 break;
0195 case TMQ_STORCMD_DUM:
0196
0197 break;
0198 default:
0199
0200 if (EXSUCCEED!=tmq_lock_msg((*p_block)->hdr.msgid))
0201 {
0202 if (TMQ_TXSTATE_PREPARED==state
0203 && TMQ_STORCMD_DEL==(*p_block)->hdr.command_code)
0204 {
0205 NDRX_LOG(log_info, "Delete command and message not found - assume deleted");
0206 }
0207 else
0208 {
0209 NDRX_LOG(log_error, "Failed to lock message!");
0210 EXFAIL_OUT(ret);
0211 }
0212 }
0213
0214 break;
0215 }
0216
0217 out:
0218
0219 if (NULL!=*p_block)
0220 {
0221 NDRX_FREE((char *)*p_block);
0222 *p_block = NULL;
0223 }
0224 return ret;
0225 }
0226
0227
0228
0229
0230
0231
0232
0233 expublic int tmq_load_msgs(void)
0234 {
0235 int ret = EXSUCCEED;
0236
0237 NDRX_LOG(log_info, "Reading messages from disk...");
0238
0239 if (EXSUCCEED!=tmq_storage_get_blocks(process_block, (short)G_tmqueue_cfg.vnodeid,
0240 (short)tpgetsrvid()))
0241 {
0242 EXFAIL_OUT(ret);
0243 }
0244
0245 out:
0246 NDRX_LOG(log_info, "tmq_load_msgs return %d", ret);
0247 return ret;
0248 }
0249
0250
0251
0252
0253
0254
0255 expublic int tmq_dum_add(char *tmxid)
0256 {
0257 tmq_msg_dum_t dum;
0258 int ret = EXSUCCEED;
0259
0260
0261
0262
0263
0264 tmq_setup_cmdheader_dum(&dum.hdr, NULL, G_tmqueue_cfg.vnodeid,
0265 tpgetsrvid(), ndrx_G_qspace, 0);
0266 dum.hdr.command_code = TMQ_STORCMD_DUM;
0267
0268
0269 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&dum,
0270 "Dummy transaction marker", tmxid, NULL))
0271 {
0272 NDRX_LOG(log_error, "Failed to write dummy command block to disk [%s]", tmxid);
0273 EXFAIL_OUT(ret);
0274 }
0275
0276 out:
0277 return ret;
0278 }
0279
0280
0281
0282
0283
0284
0285 expublic int tmq_setup_cmdheader_dum(tmq_cmdheader_t *hdr, char *qname,
0286 short nodeid, short srvid, char *qspace, long flags)
0287 {
0288 int ret = EXSUCCEED;
0289
0290 NDRX_STRCPY_SAFE(hdr->qspace, qspace);
0291
0292 hdr->command_code = TMQ_STORCMD_NEWMSG;
0293 NDRX_STRNCPY(hdr->magic, TMQ_MAGIC, TMQ_MAGIC_LEN);
0294
0295
0296 hdr->nodeid = nodeid;
0297 hdr->srvid = srvid;
0298 hdr->flags = flags;
0299 memset(hdr->reserved, 0, sizeof(hdr->reserved));
0300 memset(hdr->msgid, 0, sizeof(hdr->msgid));
0301 NDRX_STRNCPY(hdr->magic2, TMQ_MAGIC2, TMQ_MAGIC_LEN);
0302
0303 out:
0304 return ret;
0305 }
0306
0307
0308
0309
0310
0311
0312 expublic int tmq_setup_cmdheader_newmsg(tmq_cmdheader_t *hdr, char *qname,
0313 short nodeid, short srvid, char *qspace, long flags)
0314 {
0315 int ret = EXSUCCEED;
0316
0317 NDRX_STRCPY_SAFE(hdr->qspace, qspace);
0318
0319 hdr->command_code = TMQ_STORCMD_NEWMSG;
0320 NDRX_STRNCPY(hdr->magic, TMQ_MAGIC, TMQ_MAGIC_LEN);
0321
0322
0323 hdr->nodeid = nodeid;
0324 hdr->srvid = srvid;
0325 hdr->flags = flags;
0326 memset(hdr->reserved, 0, sizeof(hdr->reserved));
0327 NDRX_STRNCPY(hdr->magic2, TMQ_MAGIC2, TMQ_MAGIC_LEN);
0328
0329 tmq_msgid_gen(hdr->msgid);
0330
0331 out:
0332 return ret;
0333 }
0334
0335
0336
0337
0338
0339
0340
0341 expublic void tmq_msgid_gen(char *msgid)
0342 {
0343 exuuid_t uuid_val;
0344 short node_id = (short) G_tmqueue_cfg.vnodeid;
0345 short srv_id = (short) G_srv_id;
0346
0347 memset(msgid, 0, TMMSGIDLEN);
0348
0349
0350 ndrx_cid_generate((unsigned char)tpgetnodeid(), uuid_val);
0351
0352 memcpy(msgid, uuid_val, sizeof(exuuid_t));
0353
0354 memcpy(msgid
0355 +sizeof(exuuid_t)
0356 ,(char *)&(node_id), sizeof(short));
0357 memcpy(msgid
0358 +sizeof(exuuid_t)
0359 +sizeof(short)
0360 ,(char *)&(srv_id), sizeof(short));
0361
0362 NDRX_LOG(log_debug, "MSGID: struct size: %d", sizeof(exuuid_t)+sizeof(short)+ sizeof(short));
0363 }
0364
0365
0366
0367
0368
0369
0370
0371
0372 exprivate int load_param(tmq_qconfig_t * qconf, char *key, char *value)
0373 {
0374 int ret = EXSUCCEED;
0375
0376 NDRX_LOG(log_info, "loading q param: [%s] = [%s]", key, value);
0377
0378
0379
0380
0381
0382
0383
0384
0385 if (0==strcmp(key, TMQ_QC_SVCNM))
0386 {
0387 NDRX_STRCPY_SAFE(qconf->svcnm, value);
0388 }
0389 else if (0==strcmp(key, TMQ_QC_ERRORQ))
0390 {
0391 NDRX_STRCPY_SAFE(qconf->errorq, value);
0392 }
0393 else if (0==strcmp(key, TMQ_QC_TRIES))
0394 {
0395 int ival = atoi(value);
0396 if (!ndrx_isint(value) || ival < 0)
0397 {
0398 NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)",
0399 value, key);
0400 EXFAIL_OUT(ret);
0401 }
0402
0403 qconf->tries = ival;
0404 }
0405 else if (0==strcmp(key, TMQ_QC_AUTOQ))
0406 {
0407 char val = toupper(value[0]);
0408
0409 if (NULL==strchr(TMQ_AUTOQ_ALLFLAGS, val))
0410 {
0411 NDRX_LOG(log_error, "Invalid value [%c] for key [%s] (allowed: [%s])",
0412 val, key, TMQ_AUTOQ_ALLFLAGS);
0413 EXFAIL_OUT(ret);
0414 }
0415
0416 qconf->autoq = val;
0417
0418 }
0419 else if (0==strcmp(key, TMQ_QC_WAITINIT))
0420 {
0421 int ival = atoi(value);
0422 if (!ndrx_isint(value) || ival < 0)
0423 {
0424 NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)",
0425 value, key);
0426 EXFAIL_OUT(ret);
0427 }
0428
0429 qconf->waitinit = ival;
0430 }
0431 else if (0==strcmp(key, TMQ_QC_WAITRETRY))
0432 {
0433 int ival = atoi(value);
0434 if (!ndrx_isint(value) || ival < 0)
0435 {
0436 NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)",
0437 value, key);
0438 EXFAIL_OUT(ret);
0439 }
0440
0441 qconf->waitretry = ival;
0442 }
0443 else if (0==strcmp(key, TMQ_QC_WAITRETRYINC))
0444 {
0445 NDRX_LOG(log_warn, "Ignoring [%s]", TMQ_QC_WAITRETRYINC);
0446 }
0447 else if (0==strcmp(key, TMQ_QC_WAITRETRYMAX))
0448 {
0449 int ival = atoi(value);
0450 if (!ndrx_isint(value) || ival < 0)
0451 {
0452 NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)",
0453 value, key);
0454 EXFAIL_OUT(ret);
0455 }
0456
0457 qconf->waitretrymax = ival;
0458 }
0459 else if (0==strcmp(key, TMQ_QC_WORKERS))
0460 {
0461 int ival = atoi(value);
0462 if (!ndrx_isint(value) || ival < 1)
0463 {
0464 NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=1)",
0465 value, key);
0466 EXFAIL_OUT(ret);
0467 }
0468
0469 qconf->workers = ival;
0470 }
0471 else if (0==strcmp(key, TMQ_QC_SYNC))
0472 {
0473 int ival = ndrx_args_confirm(value);
0474
0475 if (EXFAIL==ival)
0476 {
0477 if (1==strlen(value) && NULL!=strstr(TMQ_ARGS_COMMIT, value))
0478 {
0479 ival = TMQ_SYNC_TPCOMMIT;
0480 }
0481 else
0482 {
0483 NDRX_LOG(log_error, "Invalid value [%s] for %s", value, TMQ_QC_SYNC);
0484 EXFAIL_OUT(ret);
0485 }
0486 }
0487 else if (ival)
0488 {
0489 ival = TMQ_SYNC_TPACALL;
0490 }
0491 else
0492 {
0493 ival=TMQ_SYNC_NONE;
0494 }
0495
0496 qconf->sync = ival;
0497
0498 }
0499 else if (0==strcmp(key, TMQ_QC_MEMONLY))
0500 {
0501
0502
0503
0504
0505
0506
0507
0508 }
0509 else if (0==strcmp(key, TMQ_QC_TXTOUT))
0510 {
0511
0512 qconf->txtout = atoi(value);
0513 }
0514 else if (0==strcmp(key, TMQ_QC_MODE))
0515 {
0516 if (0==strcmp(value, "fifo") || 0==strcmp(value, "FIFO"))
0517 {
0518 qconf->mode = TMQ_MODE_FIFO;
0519 }
0520 else if (0==strcmp(value, "lifo") || 0==strcmp(value, "LIFO") )
0521 {
0522 qconf->mode = TMQ_MODE_LIFO;
0523 }
0524 else
0525 {
0526 NDRX_LOG(log_error, "Not supported Q mode (must be fifo or lifo)",
0527 value, key);
0528 EXFAIL_OUT(ret);
0529 }
0530 }
0531 else
0532 {
0533 NDRX_LOG(log_error, "Unknown Q config setting = [%s]", key);
0534 EXFAIL_OUT(ret);
0535 }
0536
0537 out:
0538
0539 return ret;
0540 }
0541
0542
0543
0544
0545
0546
0547 exprivate tmq_qconfig_t * tmq_qconf_get(char *qname)
0548 {
0549 tmq_qconfig_t *ret = NULL;
0550
0551 EXHASH_FIND_STR( G_qconf, qname, ret);
0552
0553 return ret;
0554 }
0555
0556
0557
0558
0559
0560
0561
0562
0563
0564 expublic tmq_qconfig_t * tmq_qconf_get_with_default(char *qname, int *p_is_defaulted)
0565 {
0566
0567 tmq_qconfig_t * ret = tmq_qconf_get(qname);
0568
0569 if (NULL==ret)
0570 {
0571 NDRX_LOG(log_info, "Q config [%s] not found, trying to default to [%s]",
0572 qname, TMQ_DEFAULT_Q);
0573 if (NULL==(ret = tmq_qconf_get(TMQ_DEFAULT_Q)))
0574 {
0575 NDRX_LOG(log_error, "Q [%s] is not defined and default config [%s] not found!",
0576 qname, TMQ_DEFAULT_Q);
0577 userlog("Q [%s] is not defined and default config [%s] not found!",
0578 qname, TMQ_DEFAULT_Q);
0579 }
0580 else if (NULL!=p_is_defaulted)
0581 {
0582 *p_is_defaulted = EXTRUE;
0583 }
0584 }
0585
0586 return ret;
0587 }
0588
0589
0590
0591
0592
0593
0594
0595
0596
0597
0598 expublic int tmq_build_q_def(char *qname, int *p_is_defaulted, char *out_buf, size_t out_bufsz)
0599 {
0600 tmq_qconfig_t * qdef = NULL;
0601 int ret = EXSUCCEED;
0602
0603 MUTEX_LOCK_V(M_q_lock);
0604
0605 if (NULL==(qdef=tmq_qconf_get_with_default(qname, p_is_defaulted)))
0606 {
0607 EXFAIL_OUT(ret);
0608 }
0609
0610 snprintf(out_buf, out_bufsz, "%s,svcnm=%s,autoq=%c,tries=%d,waitinit=%d,waitretry=%d,"
0611 "waitretrymax=%d,mode=%s,txtout=%d,workers=%d",
0612 qdef->qname,
0613 qdef->svcnm,
0614 qdef->autoq,
0615 qdef->tries,
0616 qdef->waitinit,
0617 qdef->waitretry,
0618 qdef->waitretrymax,
0619 qdef->mode == TMQ_MODE_LIFO?"lifo":"fifo",
0620 qdef->txtout,
0621 qdef->workers);
0622
0623 if (EXEOS!=qdef->errorq[0])
0624 {
0625 int len = strlen(out_buf);
0626 snprintf(out_buf+len, out_bufsz-len, ",errorq=%s", qdef->errorq);
0627 }
0628
0629 if (qdef->sync)
0630 {
0631 int len = strlen(out_buf);
0632 char setting;
0633
0634 if (TMQ_SYNC_TPACALL==qdef->sync)
0635 {
0636 setting='y';
0637 }
0638 else
0639 {
0640 setting='c';
0641 }
0642
0643 snprintf(out_buf+len, out_bufsz-len, ",%s=%c", TMQ_QC_SYNC, setting);
0644 }
0645
0646 out:
0647 MUTEX_UNLOCK_V(M_q_lock);
0648
0649 return ret;
0650 }
0651
0652
0653
0654
0655
0656
0657
0658
0659 expublic int tmq_qconf_get_with_default_static(char *qname, tmq_qconfig_t *qconf_out)
0660 {
0661 int ret = EXSUCCEED;
0662 tmq_qconfig_t * tmp = NULL;
0663
0664 MUTEX_LOCK_V(M_q_lock);
0665
0666 tmp = tmq_qconf_get(qname);
0667
0668 if (NULL==tmp)
0669 {
0670 NDRX_LOG(log_warn, "Q config [%s] not found, trying to default to [%s]",
0671 qname, TMQ_DEFAULT_Q);
0672 if (NULL==(tmp = tmq_qconf_get(TMQ_DEFAULT_Q)))
0673 {
0674 NDRX_LOG(log_error, "Q [%s] is not defined and default config [%s] not found!",
0675 qname, TMQ_DEFAULT_Q);
0676 userlog("Q [%s] is not defined and default config [%s] not found!",
0677 qname, TMQ_DEFAULT_Q);
0678 EXFAIL_OUT(ret);
0679 }
0680
0681 }
0682
0683 memcpy(qconf_out, tmp, sizeof(*qconf_out));
0684
0685 out:
0686 MUTEX_UNLOCK_V(M_q_lock);
0687
0688 return ret;
0689 }
0690
0691
0692
0693
0694
0695
0696 exprivate int tmq_qconf_delete(char *qname)
0697 {
0698 int ret = EXSUCCEED;
0699 tmq_qconfig_t *qconf;
0700
0701 if (NULL!=(qconf=tmq_qconf_get(qname)))
0702 {
0703 EXHASH_DEL( G_qconf, qconf);
0704 NDRX_FREE(qconf);
0705 }
0706 else
0707 {
0708 NDRX_LOG(log_warn, "[%s] - queue not found", qname);
0709 }
0710
0711 out:
0712 return ret;
0713 }
0714
0715
0716
0717
0718
0719
0720 expublic int tmq_reload_conf(char *cf)
0721 {
0722 FILE *f = NULL;
0723 #ifdef HAVE_GETLINE
0724 char *line = NULL;
0725 #else
0726 char line[PATH_MAX];
0727 #endif
0728 size_t len = 0;
0729 int ret = EXSUCCEED;
0730 ssize_t read;
0731 ndrx_inicfg_section_keyval_t * csection = NULL, *val = NULL, *val_tmp = NULL;
0732
0733 if (NULL!=ndrx_get_G_cconfig() &&
0734 EXSUCCEED==ndrx_cconfig_get(NDRX_CONF_SECTION_QUEUE, &csection))
0735 {
0736 EXHASH_ITER(hh, csection, val, val_tmp)
0737 {
0738 if (EXSUCCEED!=tmq_qconf_addupd(val->val, val->key))
0739 {
0740 EXFAIL_OUT(ret);
0741 }
0742 }
0743 }
0744 else
0745 {
0746 if (NULL==(f=NDRX_FOPEN(cf, "r")))
0747 {
0748 NDRX_LOG(log_error, "Failed to open [%s]:%s", cf, strerror(errno));
0749 EXFAIL_OUT(ret);
0750 }
0751
0752 #ifdef HAVE_GETLINE
0753
0754 while (EXFAIL!=(read = getline(&line, &len, f)))
0755 #else
0756 len = sizeof(line);
0757 while (NULL!=fgets(line, len, f))
0758 #endif
0759 {
0760 ndrx_str_strip(line, " \n\r\t");
0761
0762
0763 if ('#'==*line || EXEOS==*line)
0764 {
0765 continue;
0766 }
0767
0768 if (EXSUCCEED!=tmq_qconf_addupd(line, NULL))
0769 {
0770 EXFAIL_OUT(ret);
0771 }
0772 }
0773 }
0774
0775
0776 out:
0777
0778
0779 #ifdef HAVE_GETLINE
0780 if (NULL!=line)
0781 {
0782 NDRX_FREE(line);
0783 }
0784 #endif
0785
0786 if (NULL!=csection)
0787 {
0788 ndrx_keyval_hash_free(csection);
0789 }
0790
0791 if (NULL!=f)
0792 {
0793 NDRX_FCLOSE(f);
0794 }
0795
0796 return ret;
0797 }
0798
0799
0800
0801
0802
0803
0804
0805
0806
0807 expublic int tmq_qconf_addupd(char *qconfstr, char *name)
0808 {
0809 tmq_qconfig_t * qconf=NULL;
0810 tmq_qconfig_t * dflt=NULL;
0811 char * p;
0812 char * p2;
0813 int got_default = EXFALSE;
0814 int qupdate = EXFALSE;
0815 char buf[MAX_TOKEN_SIZE];
0816 int ret = EXSUCCEED;
0817
0818 NDRX_LOG(log_info, "Add new Q: [%s]", qconfstr);
0819
0820 MUTEX_LOCK_V(M_q_lock);
0821
0822 if (NULL==name)
0823 {
0824 p = strtok (qconfstr,",");
0825 }
0826 else
0827 {
0828 p = name;
0829 }
0830
0831 if (NULL!=p)
0832 {
0833 NDRX_LOG(log_info, "Got token: [%s]", p);
0834 NDRX_STRCPY_SAFE(buf, p);
0835
0836 NDRX_LOG(log_debug, "Q name: [%s]", buf);
0837
0838 if (NULL== (qconf = tmq_qconf_get(buf)))
0839 {
0840 NDRX_LOG(log_info, "Q not found, adding: [%s]", buf);
0841 qconf= NDRX_CALLOC(1, sizeof(tmq_qconfig_t));
0842
0843
0844 qconf->mode = TMQ_MODE_FIFO;
0845 qconf->txtout = EXFAIL;
0846 qconf->workers = WORKERS_DEFAULT;
0847 if (NULL!=(dflt=tmq_qconf_get(TMQ_DEFAULT_Q)))
0848 {
0849 memcpy(qconf, dflt, sizeof(*dflt));
0850 got_default = EXTRUE;
0851 }
0852
0853 NDRX_STRCPY_SAFE(qconf->qname, buf);
0854 }
0855 else
0856 {
0857 NDRX_LOG(log_info, "Q found, updating: [%s]", buf);
0858 qupdate = EXTRUE;
0859 }
0860 }
0861 else
0862 {
0863 NDRX_LOG(log_error, "Missing Q name");
0864 EXFAIL_OUT(ret);
0865 }
0866
0867 if (NULL==name)
0868 {
0869 p = strtok (NULL, ",");
0870 }
0871 else
0872 {
0873 p = strtok (qconfstr, ",");
0874 }
0875
0876 while (p != NULL)
0877 {
0878 NDRX_LOG(log_info, "Got pair [%s]", p);
0879
0880 NDRX_STRCPY_SAFE(buf, p);
0881
0882 p2 = strchr(buf, '=');
0883
0884 if (NULL == p2)
0885 {
0886 NDRX_LOG(log_error, "Invalid key=value token [%s] expected '='", buf);
0887
0888 userlog("Error defining queue (%s) expected in '=' in token (%s)",
0889 qconfstr, buf);
0890 EXFAIL_OUT(ret);
0891 }
0892 *p2 = EXEOS;
0893 p2++;
0894
0895 if (EXEOS==*p2)
0896 {
0897 NDRX_LOG(log_error, "Empty value for token [%s]", buf);
0898 userlog("Error defining queue (%s) invalid value for token (%s)",
0899 qconfstr, buf);
0900 EXFAIL_OUT(ret);
0901 }
0902
0903
0904
0905
0906 if (EXSUCCEED!=load_param(qconf, buf, p2))
0907 {
0908 NDRX_LOG(log_error, "Failed to load token [%s]/[%s]", buf, p2);
0909 userlog("Error defining queue (%s) failed to load token [%s]/[%s]",
0910 buf, p2);
0911 EXFAIL_OUT(ret);
0912 }
0913
0914 p = strtok (NULL, ",");
0915 }
0916
0917
0918
0919 if (0==strcmp(qconf->qname, TMQ_DEFAULT_Q) && got_default)
0920 {
0921 NDRX_LOG(log_error, "Missing [%s] param", TMQ_QC_NAME);
0922
0923 EXFAIL_OUT(ret);
0924 }
0925
0926
0927 if (!qupdate)
0928 {
0929 EXHASH_ADD_STR( G_qconf, qname, qconf );
0930 }
0931
0932 out:
0933
0934
0935 if (EXSUCCEED!=ret && NULL!=qconf && !qupdate)
0936 {
0937 NDRX_LOG(log_warn, "qconf -> free");
0938 NDRX_FREE(qconf);
0939 }
0940
0941 MUTEX_UNLOCK_V(M_q_lock);
0942 return ret;
0943
0944 }
0945
0946
0947
0948
0949
0950
0951 exprivate tmq_qhash_t * tmq_qhash_get(char *qname)
0952 {
0953 tmq_qhash_t * ret = NULL;
0954
0955 EXHASH_FIND_STR( G_qhash, qname, ret);
0956
0957 return ret;
0958 }
0959
0960
0961
0962
0963
0964
0965 exprivate tmq_qhash_t * tmq_qhash_new(char *qname)
0966 {
0967 tmq_qhash_t * ret = NDRX_CALLOC(1, sizeof(tmq_qhash_t));
0968
0969 if (NULL==ret)
0970 {
0971 NDRX_LOG(log_error, "Failed to alloc tmq_qhash_t: %s", strerror(errno));
0972 userlog("Failed to alloc tmq_qhash_t: %s", strerror(errno));
0973 goto out;
0974 }
0975
0976 NDRX_STRCPY_SAFE(ret->qname, qname);
0977
0978 EXHASH_ADD_STR( G_qhash, qname, ret );
0979
0980
0981 ndrx_rbt_init(&ret->q, tmq_rbt_cmp_cur, tmq_rbt_combine_cur, NULL, ret);
0982 ndrx_rbt_init(&ret->q_fut, tmq_rbt_cmp_fut, tmq_rbt_combine_fut, NULL, ret);
0983
0984 out:
0985 return ret;
0986 }
0987
0988
0989
0990
0991
0992
0993
0994
0995
0996
0997
0998
0999 expublic int tmq_msg_add(tmq_msg_t **msg, int is_recovery, TPQCTL *diag, int *int_diag)
1000 {
1001 int ret = EXSUCCEED;
1002 int is_locked = EXFALSE;
1003 tmq_qhash_t *qhash;
1004 tmq_memmsg_t *mmsg = NDRX_CALLOC(1, sizeof(tmq_memmsg_t));
1005 tmq_qconfig_t * qconf;
1006 char msgid_str[TMMSGIDLEN_STR+1];
1007 char corrid_str[TMCORRIDLEN_STR+1];
1008
1009 MUTEX_LOCK_V(M_q_lock);
1010 is_locked = EXTRUE;
1011
1012 qhash = tmq_qhash_get((*msg)->hdr.qname);
1013 qconf = tmq_qconf_get_with_default((*msg)->hdr.qname, NULL);
1014
1015 if (NULL==mmsg)
1016 {
1017 NDRX_LOG(log_error, "Failed to alloc tmq_memmsg_t: %s", strerror(errno));
1018 userlog("Failed to alloc tmq_memmsg_t: %s", strerror(errno));
1019 EXFAIL_OUT(ret);
1020 }
1021
1022 if (NULL==qconf)
1023 {
1024 NDRX_LOG(log_error, "queue config not found! Cannot enqueue!");
1025 userlog("queue config not found! Cannot enqueue!");
1026
1027 if (NULL!=diag)
1028 {
1029 diag->diagnostic = QMEBADQUEUE;
1030 snprintf(diag->diagmsg, sizeof(diag->diagmsg), "Queue [%s] not defined",
1031 (*msg)->hdr.qname);
1032 }
1033 EXFAIL_OUT(ret);
1034 }
1035
1036 mmsg->msg = *msg;
1037
1038
1039 tmq_msgid_serialize(mmsg->msg->hdr.msgid, msgid_str);
1040 NDRX_LOG(log_info, "Adding to G_msgid_hash [%s]", msgid_str);
1041
1042 if (NULL!=tmq_get_msg_by_msgid_str(msgid_str))
1043 {
1044 NDRX_LOG(log_error, "Message with msgid [%s] already exists!", msgid_str);
1045 userlog("Message with msgid [%s] already exists!", msgid_str);
1046 EXFAIL_OUT(ret);
1047 }
1048
1049
1050 if (NULL==qhash && NULL==(qhash=tmq_qhash_new((*msg)->hdr.qname)))
1051 {
1052 NDRX_LOG(log_error, "Failed to get/create qhash entry for Q [%s]",
1053 (*msg)->hdr.qname);
1054 EXFAIL_OUT(ret);
1055 }
1056
1057 NDRX_STRCPY_SAFE(mmsg->msgid_str, msgid_str);
1058
1059 if (mmsg->msg->qctl.flags & TPQCORRID)
1060 {
1061 tmq_msgid_serialize((*msg)->qctl.corrid, corrid_str);
1062 NDRX_STRCPY_SAFE(mmsg->corrid_str, corrid_str);
1063 NDRX_LOG(log_debug, "Adding to corrid_hash [%s] of queue [%s]",
1064 corrid_str, (*msg)->hdr.qname);
1065 }
1066
1067
1068 if (EXSUCCEED!=(ret=ndrx_infl_addmsg(qconf, qhash, mmsg)))
1069 {
1070 NDRX_LOG(log_error, "ndrx_infl_addmsg failed with %d", ret);
1071 goto out;
1072 }
1073
1074
1075
1076 MUTEX_UNLOCK_V(M_q_lock);
1077 is_locked = EXFALSE;
1078
1079
1080
1081
1082
1083
1084 if (!qconf->memonly)
1085 {
1086
1087 if (!is_recovery)
1088 {
1089 if (EXSUCCEED!=tmq_storage_write_cmd_newmsg(mmsg->msg, int_diag))
1090 {
1091 NDRX_LOG(log_error, "Failed to add message to persistent store!");
1092
1093
1094 if (NULL!=diag)
1095 {
1096 diag->diagnostic = QMEOS;
1097 snprintf(diag->diagmsg, sizeof(diag->diagmsg), "Failed to persist msg");
1098 }
1099
1100 EXFAIL_OUT(ret);
1101 }
1102 }
1103 }
1104 else
1105 {
1106 NDRX_LOG(log_info, "Mem only Q, not persisting.");
1107 }
1108
1109
1110
1111
1112 MUTEX_LOCK_V(M_q_lock);
1113 qhash->numenq++;
1114 MUTEX_UNLOCK_V(M_q_lock);
1115
1116 NDRX_LOG(log_info, "Message with id [%s] successfully enqueued to [%s] "
1117 "queue (DEBUG: locked %ld)",
1118 tmq_msgid_serialize((*msg)->hdr.msgid, msgid_str), (*msg)->hdr.qname,
1119 mmsg->msg->lockthreadid);
1120
1121
1122 *msg = NULL;
1123
1124 out:
1125
1126 if (is_locked)
1127 {
1128 MUTEX_UNLOCK_V(M_q_lock);
1129 is_locked=EXFALSE;
1130 }
1131
1132
1133
1134
1135
1136
1137
1138 if (EXSUCCEED!=ret && mmsg!=NULL)
1139 {
1140
1141 MUTEX_LOCK_V(M_q_lock);
1142
1143
1144 ndrx_infl_delmsg(mmsg);
1145
1146 MUTEX_UNLOCK_V(M_q_lock);
1147
1148 NDRX_FREE(mmsg);
1149 mmsg=NULL;
1150 }
1151
1152
1153 if (NULL==mmsg)
1154 {
1155 NDRX_FREE((*msg));
1156 *msg = NULL;
1157 }
1158
1159 return ret;
1160 }
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170 expublic tmq_msg_t * tmq_msg_dequeue(char *qname, long flags, int is_auto, long *diagnostic,
1171 char *diagmsg, size_t diagmsgsz, char *corrid_str, int *int_diag)
1172 {
1173 tmq_qhash_t *qhash;
1174 tmq_corhash_t *corhash;
1175 tmq_memmsg_t *node = NULL;
1176 tmq_msg_t * ret = NULL;
1177 tmq_msg_del_t block;
1178 char msgid_str[TMMSGIDLEN_STR+1];
1179 tmq_qconfig_t *qconf;
1180 int is_locked=EXFALSE;
1181
1182 *diagnostic=EXSUCCEED;
1183
1184 NDRX_LOG(log_debug, "FIFO/LIFO dequeue for [%s]", qname);
1185 MUTEX_LOCK_V(M_q_lock);
1186 is_locked=EXTRUE;
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196 if (NULL==(qconf=tmq_qconf_get_with_default(qname, NULL)))
1197 {
1198
1199 NDRX_LOG(log_error, "Failed to get q config [%s]",
1200 qname);
1201
1202 *diagnostic=QMEBADQUEUE;
1203 snprintf(diagmsg, diagmsgsz, "Queue [%s] not defined", qname);
1204
1205 goto out;
1206 }
1207
1208
1209 if (NULL==(qhash = tmq_qhash_get(qname)))
1210 {
1211 NDRX_LOG(log_warn, "Q [%s] is NULL/empty", qname);
1212 *diagnostic=QMENOMSG;
1213 snprintf(diagmsg, diagmsgsz, "Q [%s] is NULL/empty", qname);
1214 goto out;
1215 }
1216
1217 NDRX_LOG(log_debug, "mode corrid_str[%s]: %s", corrid_str?corrid_str:"N/A",
1218 TMQ_MODE_LIFO == qconf->mode?"LIFO":"FIFO");
1219
1220
1221 if (NULL!=corrid_str && NULL==(corhash = tmq_cor_find(qhash, corrid_str)))
1222 {
1223 NDRX_LOG(log_info, "Q [%s] corrid_str [%s] not defined",
1224 qname, corrid_str);
1225 snprintf(diagmsg, diagmsgsz, "Q [%s] corrid_str [%s] not defined",
1226 qname, corrid_str);
1227 *diagnostic=QMENOMSG;
1228 goto out;
1229 }
1230
1231
1232 ndrx_infl_fut2cur(qhash);
1233
1234 if (TMQ_MODE_LIFO == qconf->mode)
1235 {
1236
1237 if (NULL!=corrid_str)
1238 {
1239
1240 node = TMQ_COR_GETMSG(ndrx_rbt_rightmost(&corhash->corq));
1241 }
1242 else
1243 {
1244 node = (tmq_memmsg_t*)ndrx_rbt_rightmost(&qhash->q);
1245 }
1246 }
1247 else
1248 {
1249
1250 if (NULL!=corrid_str)
1251 {
1252
1253 node = TMQ_COR_GETMSG(ndrx_rbt_leftmost(&corhash->corq));
1254 }
1255 else
1256 {
1257 node = (tmq_memmsg_t*)ndrx_rbt_leftmost(&qhash->q);
1258 }
1259 }
1260
1261 if (NULL!=node)
1262 {
1263 ret=node->msg;
1264 }
1265
1266 if (NULL==ret)
1267 {
1268 NDRX_LOG(log_debug, "Q [%s] is empty or all msgs locked", qname);
1269 goto out;
1270 }
1271
1272
1273
1274 tmq_msgid_serialize(ret->hdr.msgid, msgid_str);
1275 NDRX_LOG(log_info, "Dequeued message: [%s]", msgid_str);
1276 NDRX_DUMP(log_debug, "Dequeued message", ret->msg, ret->len);
1277
1278
1279 ret->lockthreadid = ndrx_gettid();
1280 if (EXSUCCEED!=ndrx_infl_mov2infl(node))
1281 {
1282 NDRX_LOG(log_error, "Failed to move msg to inflight!");
1283 ret = NULL;
1284 *diagnostic=QMEOS;
1285 NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1286 goto out;
1287 }
1288
1289 MUTEX_UNLOCK_V(M_q_lock);
1290 is_locked=EXFALSE;
1291
1292
1293 if (!(flags & TPQPEEK) && !is_auto)
1294 {
1295
1296 memcpy(&block.hdr, &ret->hdr, sizeof(ret->hdr));
1297 block.hdr.command_code = TMQ_STORCMD_DEL;
1298
1299 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&block,
1300 "Removing dequeued message", NULL, int_diag))
1301 {
1302 NDRX_LOG(log_error, "Failed to remove msg...");
1303
1304 MUTEX_LOCK_V(M_q_lock);
1305
1306 ret->lockthreadid = 0;
1307
1308 ndrx_infl_mov2cur(node);
1309
1310 MUTEX_UNLOCK_V(M_q_lock);
1311
1312 ret = NULL;
1313 *diagnostic=QMEOS;
1314 NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1315 goto out;
1316 }
1317 }
1318
1319 out:
1320
1321 if (is_locked)
1322 {
1323 MUTEX_UNLOCK_V(M_q_lock);
1324 }
1325
1326
1327 if (NULL==ret && EXSUCCEED==*diagnostic)
1328 {
1329 NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: no message in Q!", diagmsgsz);
1330 *diagnostic=QMENOMSG;
1331 }
1332
1333 return ret;
1334 }
1335
1336
1337
1338
1339
1340
1341
1342
1343 expublic tmq_msg_t * tmq_msg_dequeue_by_msgid(char *msgid, long flags, long *diagnostic,
1344 char *diagmsg, size_t diagmsgsz, int *int_diag)
1345 {
1346 tmq_msg_t * ret = NULL;
1347 tmq_msg_del_t del;
1348 char msgid_str[TMMSGIDLEN_STR+1];
1349 tmq_memmsg_t *mmsg;
1350 int is_locked=EXFALSE;
1351
1352 *diagnostic=EXSUCCEED;
1353
1354 MUTEX_LOCK_V(M_q_lock);
1355 is_locked=EXTRUE;
1356
1357
1358
1359 tmq_msgid_serialize(msgid, msgid_str);
1360 NDRX_LOG(log_info, "MSGID: Dequeuing message by [%s]", msgid_str);
1361
1362 if (NULL==(mmsg = tmq_get_msg_by_msgid_str(msgid_str)))
1363 {
1364 NDRX_LOG(log_error, "Message not found by msgid_str [%s]", msgid_str);
1365 goto out;
1366 }
1367
1368 ret = mmsg->msg;
1369
1370 NDRX_DUMP(log_debug, "Dequeued message", ret->msg, ret->len);
1371
1372
1373 if (0!=ret->lockthreadid)
1374 {
1375 NDRX_LOG(log_error, "Message is busy (locked by thread [%llu])", ret->lockthreadid);
1376 ret = NULL;
1377 goto out;
1378 }
1379
1380
1381 ret->lockthreadid = ndrx_gettid();
1382
1383
1384 ndrx_infl_mov2infl(mmsg);
1385
1386
1387 MUTEX_UNLOCK_V(M_q_lock);
1388 is_locked=EXFALSE;
1389
1390
1391 memcpy(&del.hdr, &ret->hdr, sizeof(ret->hdr));
1392
1393 del.hdr.command_code = TMQ_STORCMD_DEL;
1394
1395 if (!(flags & TPQPEEK))
1396 {
1397 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&del,
1398 "Removing dequeued message", NULL, int_diag))
1399 {
1400 NDRX_LOG(log_error, "Failed to remove msg...");
1401
1402 MUTEX_LOCK_V(M_q_lock);
1403 ret->lockthreadid = 0;
1404 ndrx_infl_mov2cur(mmsg);
1405 MUTEX_UNLOCK_V(M_q_lock);
1406 ret = NULL;
1407 *diagnostic=QMEOS;
1408 NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1409 goto out;
1410 }
1411 }
1412
1413 out:
1414 if (is_locked)
1415 {
1416 MUTEX_UNLOCK_V(M_q_lock);
1417 }
1418
1419
1420 if (NULL==ret && EXSUCCEED==*diagnostic)
1421 {
1422 NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: no message in Q!", diagmsgsz);
1423 *diagnostic=QMENOMSG;
1424 }
1425
1426 return ret;
1427 }
1428
1429
1430
1431
1432
1433
1434 exprivate tmq_memmsg_t* tmq_get_msg_by_msgid_str(char *msgid_str)
1435 {
1436 tmq_memmsg_t *ret;
1437
1438 EXHASH_FIND_STR( G_msgid_hash, msgid_str, ret);
1439
1440 return ret;
1441 }
1442
1443
1444
1445
1446
1447
1448 exprivate void tmq_remove_msg(tmq_memmsg_t *mmsg)
1449 {
1450 char msgid_str[TMMSGIDLEN_STR+1];
1451 tmq_msgid_serialize(mmsg->msg->hdr.msgid, msgid_str);
1452
1453 tmq_qhash_t *qhash = tmq_qhash_get(mmsg->msg->hdr.qname);
1454
1455 NDRX_LOG(log_info, "Removing msgid [%s] from [%s] q", msgid_str, mmsg->msg->hdr.qname);
1456
1457 if (NULL!=qhash)
1458 {
1459 qhash->numdeq++;
1460 }
1461
1462 ndrx_infl_delmsg(mmsg);
1463
1464 NDRX_FREE(mmsg->msg);
1465 NDRX_FREE(mmsg);
1466 }
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 expublic int tmq_unlock_msg(union tmq_upd_block *b)
1477 {
1478 int ret = EXSUCCEED;
1479 char msgid_str[TMMSGIDLEN_STR+1];
1480 tmq_memmsg_t* mmsg;
1481
1482 tmq_msgid_serialize(b->hdr.msgid, msgid_str);
1483
1484 NDRX_LOG(log_info, "Unlocking/updating: %s", msgid_str);
1485
1486 MUTEX_LOCK_V(M_q_lock);
1487
1488
1489 if (TMQ_STORCMD_DUM!=b->hdr.command_code)
1490 {
1491 mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1492
1493 if (NULL==mmsg)
1494 {
1495 NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1496
1497
1498
1499
1500 goto out;
1501 }
1502 }
1503
1504 switch (b->hdr.command_code)
1505 {
1506 case TMQ_STORCMD_DEL:
1507 NDRX_LOG(log_info, "Removing message...");
1508 tmq_remove_msg(mmsg);
1509 mmsg = NULL;
1510 break;
1511 case TMQ_STORCMD_UPD:
1512 UPD_MSG((mmsg->msg), (&b->upd));
1513
1514 case TMQ_STORCMD_NEWMSG:
1515 case TMQ_STORCMD_UNLOCK:
1516 NDRX_LOG(log_info, "Unlocking message...");
1517 mmsg->msg->lockthreadid = 0;
1518
1519 ndrx_infl_mov2cur(mmsg);
1520
1521 ndrx_forward_chkrun(mmsg);
1522
1523 break;
1524 case TMQ_STORCMD_DUM:
1525
1526 break;
1527 default:
1528 NDRX_LOG(log_info, "Unknown command [%c]", b->hdr.command_code);
1529 EXFAIL_OUT(ret);
1530 break;
1531 }
1532
1533 out:
1534 MUTEX_UNLOCK_V(M_q_lock);
1535 return ret;
1536 }
1537
1538
1539
1540
1541
1542
1543
1544 expublic int tmq_unlock_msg_by_msgid(char *msgid, int chkrun)
1545 {
1546 int ret = EXSUCCEED;
1547 char msgid_str[TMMSGIDLEN_STR+1];
1548 tmq_memmsg_t* mmsg;
1549
1550 tmq_msgid_serialize(msgid, msgid_str);
1551
1552 NDRX_LOG(log_info, "Unlocking/updating: %s", msgid_str);
1553
1554 MUTEX_LOCK_V(M_q_lock);
1555
1556 mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1557
1558 if (NULL==mmsg)
1559 {
1560 NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1561 EXFAIL_OUT(ret);
1562 }
1563
1564 mmsg->msg->lockthreadid = 0;
1565 ndrx_infl_mov2cur(mmsg);
1566
1567 if (chkrun)
1568 {
1569 ndrx_forward_chkrun(mmsg);
1570 }
1571
1572 out:
1573 MUTEX_UNLOCK_V(M_q_lock);
1574 return ret;
1575 }
1576
1577
1578
1579
1580
1581
1582
1583 expublic int tmq_lock_msg(char *msgid)
1584 {
1585 int ret = EXSUCCEED;
1586 char msgid_str[TMMSGIDLEN_STR+1];
1587 tmq_memmsg_t* mmsg;
1588
1589 tmq_msgid_serialize(msgid, msgid_str);
1590
1591 NDRX_LOG(log_info, "Locking: %s", msgid_str);
1592
1593 MUTEX_LOCK_V(M_q_lock);
1594
1595 mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1596
1597 if (NULL==mmsg)
1598 {
1599 NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1600 EXFAIL_OUT(ret);
1601 }
1602
1603
1604 mmsg->msg->lockthreadid = ndrx_gettid();
1605 ndrx_infl_mov2infl(mmsg);
1606
1607 out:
1608 MUTEX_UNLOCK_V(M_q_lock);
1609 return ret;
1610 }
1611
1612 #if 0
1613
1614
1615
1616
1617
1618
1619 expublic int q_msg_sort(tmq_memmsg_t *q1, tmq_memmsg_t *q2)
1620 {
1621
1622 return ndrx_compare3(q1->msg->msgtstamp, q1->msg->msgtstamp_usec, q1->msg->msgtstamp_cntr,
1623 q2->msg->msgtstamp, q2->msg->msgtstamp_usec, q2->msg->msgtstamp_cntr);
1624
1625 }
1626 #endif
1627
1628
1629
1630
1631 expublic fwd_qlist_t *tmq_get_qlist(int auto_only, int incl_def)
1632 {
1633 fwd_qlist_t * ret = NULL;
1634 fwd_qlist_t * tmp = NULL;
1635
1636 tmq_qhash_t *q, *qtmp;
1637 tmq_qconfig_t *qconf;
1638
1639 tmq_qconfig_t *qc, *qctmp;
1640
1641 MUTEX_LOCK_V(M_q_lock);
1642
1643 EXHASH_ITER(hh, G_qhash, q, qtmp)
1644 {
1645 if (NULL!=(qconf=tmq_qconf_get_with_default(q->qname, NULL)) &&
1646 ((auto_only && TMQ_AUTOQ_ISAUTO(qconf->autoq)) || !auto_only))
1647 {
1648 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(fwd_qlist_t))))
1649 {
1650 int err = errno;
1651 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1652 userlog("Failed to alloc: %s", strerror(err));
1653 ret = NULL;
1654 goto out;
1655 }
1656
1657 NDRX_STRCPY_SAFE(tmp->qname, q->qname);
1658 tmp->succ = q->succ;
1659 tmp->fail = q->fail;
1660
1661 tmp->numenq = q->numenq;
1662 tmp->numdeq = q->numdeq;
1663 tmp->workers = qconf->workers;
1664 tmp->sync = qconf->sync;
1665
1666 DL_APPEND(ret, tmp);
1667 }
1668
1669 }
1670
1671
1672
1673
1674
1675 if (incl_def)
1676 {
1677 EXHASH_ITER(hh, G_qconf, qc, qctmp)
1678 {
1679 if (NULL==tmq_qhash_get(qc->qname))
1680 {
1681 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(fwd_qlist_t))))
1682 {
1683 int err = errno;
1684 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1685 userlog("Failed to alloc: %s", strerror(err));
1686 ret = NULL;
1687 goto out;
1688 }
1689 NDRX_LOG(log_debug, "tmq_get_qlist: %s", qc->qname);
1690 NDRX_STRCPY_SAFE(tmp->qname, qc->qname);
1691 DL_APPEND(ret, tmp);
1692 }
1693 }
1694 }
1695
1696 out:
1697 MUTEX_UNLOCK_V(M_q_lock);
1698 return ret;
1699 }
1700
1701
1702
1703
1704
1705 expublic tmq_memmsg_t *tmq_get_msglist(char *qname)
1706 {
1707 tmq_qhash_t *qhash;
1708 tmq_memmsg_t *node;
1709 tmq_memmsg_t * ret = NULL;
1710 tmq_memmsg_t * tmp = NULL;
1711 tmq_msg_t * msg = NULL;
1712 ndrx_rbt_tree_iterator_t iter;
1713 ndrx_rbt_tree_t *rbt_trees[2];
1714 int i;
1715
1716 NDRX_LOG(log_debug, "tmq_get_msglist listing for [%s]", qname);
1717 MUTEX_LOCK_V(M_q_lock);
1718
1719 if (NULL==(qhash = tmq_qhash_get(qname)))
1720 {
1721 NDRX_LOG(log_warn, "Q [%s] is NULL/empty", qname);
1722 goto out;
1723 }
1724
1725
1726
1727
1728
1729 node = qhash->q_infligh;
1730
1731 do
1732 {
1733 if (NULL!=node)
1734 {
1735 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(tmq_memmsg_t))))
1736 {
1737 int err = errno;
1738 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1739 userlog("Failed to alloc: %s", strerror(err));
1740 ret = NULL;
1741 goto out;
1742 }
1743
1744 if (NULL==(msg = NDRX_MALLOC(sizeof(tmq_msg_t))))
1745 {
1746 int err = errno;
1747 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1748 userlog("Failed to alloc: %s", strerror(err));
1749 ret = NULL;
1750 goto out;
1751 }
1752
1753 memcpy(msg, node->msg, sizeof(tmq_msg_t));
1754 tmp->msg = msg;
1755
1756 DL_APPEND(ret, tmp);
1757
1758
1759 node = node->next;
1760 }
1761 }
1762 while (NULL!=node && node!=qhash->q_infligh);
1763
1764
1765 rbt_trees[0] = &qhash->q;
1766 rbt_trees[1] = &qhash->q_fut;
1767 for (i=0; i<2; i++)
1768 {
1769 ndrx_rbt_begin_iterate(rbt_trees[i], LeftRightWalk, &iter);
1770 while (NULL!=(node=(tmq_memmsg_t *)ndrx_rbt_iterate(&iter)))
1771 {
1772 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(tmq_memmsg_t))))
1773 {
1774 int err = errno;
1775 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1776 userlog("Failed to alloc: %s", strerror(err));
1777 ret = NULL;
1778 goto out;
1779 }
1780
1781 if (NULL==(msg = NDRX_MALLOC(sizeof(tmq_msg_t))))
1782 {
1783 int err = errno;
1784 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1785 userlog("Failed to alloc: %s", strerror(err));
1786 ret = NULL;
1787 goto out;
1788 }
1789
1790 memcpy(msg, node->msg, sizeof(tmq_msg_t));
1791 tmp->msg = msg;
1792
1793 DL_APPEND(ret, tmp);
1794
1795 }
1796 }
1797
1798 out:
1799 MUTEX_UNLOCK_V(M_q_lock);
1800 return ret;
1801 }
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811 expublic int tmq_update_q_stats(char *qname, long succ_diff, long fail_diff)
1812 {
1813 tmq_qhash_t *q;
1814 MUTEX_LOCK_V(M_q_lock);
1815
1816 if (NULL!=(q = tmq_qhash_get(qname)))
1817 {
1818 q->succ += succ_diff;
1819 q->fail += fail_diff;
1820 }
1821
1822 out:
1823
1824 MUTEX_UNLOCK_V(M_q_lock);
1825
1826 return EXSUCCEED;
1827 }
1828
1829
1830
1831
1832
1833
1834
1835
1836 expublic void tmq_get_q_stats(char *qname, long *p_msgs, long *p_locked)
1837 {
1838 tmq_qhash_t *qhash;
1839 tmq_memmsg_t *node;
1840 ndrx_rbt_tree_iterator_t iter;
1841 ndrx_rbt_tree_t *rbt_trees[2];
1842
1843 MUTEX_LOCK_V(M_q_lock);
1844
1845 if (NULL!=(qhash = tmq_qhash_get(qname)))
1846 {
1847 node = qhash->q_infligh;
1848
1849 do
1850 {
1851 if (NULL!=node)
1852 {
1853 *p_msgs = *p_msgs +1 ;
1854 if (node->msg->lockthreadid)
1855 {
1856 *p_locked = *p_locked +1 ;
1857 }
1858
1859 node = node->next;
1860 }
1861
1862 }
1863 while (NULL!=node && node!=qhash->q_infligh);
1864
1865
1866 rbt_trees[0] = &qhash->q;
1867 rbt_trees[1] = &qhash->q_fut;
1868 for (int i=0; i<2; i++)
1869 {
1870 ndrx_rbt_begin_iterate(rbt_trees[i], LeftRightWalk, &iter);
1871 while (NULL!=(node=(tmq_memmsg_t *)ndrx_rbt_iterate(&iter)))
1872 {
1873 *p_msgs = *p_msgs +1 ;
1874 }
1875 }
1876 }
1877
1878 MUTEX_UNLOCK_V(M_q_lock);
1879 }
1880
1881
1882
1883
1884
1885