0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <atmi_int.h>
0044 #include <typed_buf.h>
0045 #include <ndrstandard.h>
0046 #include <ubf.h>
0047 #include <Exfields.h>
0048
0049 #include <ndrxdcmn.h>
0050
0051 #include "tmqueue.h"
0052 #include "../libatmisrv/srv_int.h"
0053 #include "tperror.h"
0054 #include "nstdutil.h"
0055 #include <qcommon.h>
0056 #include "tmqd.h"
0057 #include "userlog.h"
0058 #include "cconfig.h"
0059 #include <ubfutil.h>
0060
0061
0062
0063
0064 #define SET_NO_ABORT do {\
0065 if (EXSUCCEED!=ret)\
0066 {\
0067 switch(qctl_out.diagnostic)\
0068 {\
0069 case QMEINVAL:\
0070 case QMENOMSG:\
0071 *p_sysflags|=NDRX_SYS_NOABORT;\
0072 break;\
0073 }\
0074 }\
0075 } while (0)
0076
0077
0078
0079
0080
0081 exprivate __thread int M_is_xa_open = EXFALSE;
0082 exprivate MUTEX_LOCKDECL(M_tstamp_lock);
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095 expublic int tmq_enqueue(UBFH *p_ub, int *int_diag)
0096 {
0097 int ret = EXSUCCEED;
0098 tmq_msg_t *p_msg = NULL;
0099 char *data = NULL;
0100 BFLDLEN len = 0;
0101 TPQCTL qctl_out;
0102 static volatile int first = EXTRUE;
0103 int local_tx = EXFALSE;
0104
0105
0106 static volatile long t_sec = 0;
0107 static volatile long t_usec = 0;
0108
0109
0110
0111
0112 static volatile int t_cntr=0;
0113
0114
0115 NDRX_LOG(log_debug, "Into tmq_enqueue()");
0116
0117 memset(&qctl_out, 0, sizeof(qctl_out));
0118
0119 ndrx_debug_dump_UBF(log_info, "tmq_enqueue called with", p_ub);
0120
0121 if (!M_is_xa_open)
0122 {
0123 if (EXSUCCEED!=tpopen())
0124 {
0125 NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s",
0126 tpstrerror(tperrno));
0127 userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0128 }
0129 else
0130 {
0131 M_is_xa_open = EXTRUE;
0132 }
0133 }
0134
0135 if (!tpgetlev())
0136 {
0137 NDRX_LOG(log_debug, "Not in global transaction, starting local...");
0138 if (EXSUCCEED!=tpbegin(G_tmqueue_cfg.dflt_timeout, 0))
0139 {
0140 NDRX_LOG(log_error, "Failed to start global tx!");
0141 userlog("Failed to start global tx!");
0142 }
0143 else
0144 {
0145 NDRX_LOG(log_debug, "Transaction started ok...");
0146 local_tx = EXTRUE;
0147 }
0148 }
0149
0150 if (NULL==(data = Bgetalloc(p_ub, EX_DATA, 0, &len)))
0151 {
0152 NDRX_LOG(log_error, "Missing EX_DATA!");
0153 userlog("Missing EX_DATA!");
0154
0155 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "Missing EX_DATA!");
0156 qctl_out.diagnostic = QMEINVAL;
0157
0158 EXFAIL_OUT(ret);
0159 }
0160
0161
0162
0163
0164 p_msg = NDRX_MALLOC(sizeof(tmq_msg_t)+len);
0165
0166 if (NULL==p_msg)
0167 {
0168 NDRX_LOG(log_error, "Failed to malloc tmq_msg_t!");
0169 userlog("Failed to malloc tmq_msg_t!");
0170
0171 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "Failed to malloc tmq_msg_t!");
0172 qctl_out.diagnostic = QMEOS;
0173
0174 EXFAIL_OUT(ret);
0175 }
0176
0177 memset(p_msg, 0, sizeof(tmq_msg_t));
0178
0179 memcpy(p_msg->msg, data, len);
0180 p_msg->len = len;
0181
0182 NDRX_DUMP(log_debug, "Got message for Q: ", p_msg->msg, p_msg->len);
0183
0184 if (EXSUCCEED!=Bget(p_ub, EX_QNAME, 0, p_msg->hdr.qname, 0))
0185 {
0186 NDRX_LOG(log_error, "tmq_enqueue: failed to get EX_QNAME");
0187
0188 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed to get EX_QNAME!");
0189 qctl_out.diagnostic = QMEINVAL;
0190
0191 EXFAIL_OUT(ret);
0192 }
0193
0194
0195 if (EXSUCCEED!=tmq_tpqctl_from_ubf_enqreq(p_ub, &p_msg->qctl))
0196 {
0197 NDRX_LOG(log_error, "tmq_enqueue: failed convert ctl "
0198 "to internal UBF buf!");
0199 userlog("tmq_enqueue: failed convert ctl "
0200 "to internal UBF buf!");
0201
0202 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed convert ctl "
0203 "to internal UBF buf!");
0204 qctl_out.diagnostic = QMESYSTEM;
0205
0206 EXFAIL_OUT(ret);
0207 }
0208
0209
0210 tmq_setup_cmdheader_newmsg(&p_msg->hdr, p_msg->hdr.qname,
0211 G_tmqueue_cfg.vnodeid, G_server_conf.srv_id, ndrx_G_qspace, p_msg->qctl.flags);
0212
0213
0214 memcpy(qctl_out.msgid, p_msg->hdr.msgid, TMMSGIDLEN);
0215 memcpy(p_msg->qctl.msgid, p_msg->hdr.msgid, TMMSGIDLEN);
0216
0217 p_msg->lockthreadid = ndrx_gettid();
0218
0219 ndrx_utc_tstamp2(&p_msg->msgtstamp, &p_msg->msgtstamp_usec);
0220
0221 MUTEX_LOCK_V(M_tstamp_lock);
0222
0223 if (first)
0224 {
0225
0226
0227
0228
0229
0230
0231 t_cntr = ndrx_rand()%2500000;
0232 first=EXFALSE;
0233 }
0234
0235 if (p_msg->msgtstamp == t_sec && p_msg->msgtstamp_usec == t_usec)
0236 {
0237 t_cntr++;
0238 }
0239 else
0240 {
0241 t_sec = p_msg->msgtstamp;
0242 t_usec = p_msg->msgtstamp_usec;
0243 t_cntr = ndrx_rand()%2500000;
0244 }
0245 p_msg->msgtstamp_cntr = t_cntr;
0246 MUTEX_UNLOCK_V(M_tstamp_lock);
0247
0248 p_msg->status = TMQ_STATUS_ACTIVE;
0249
0250 NDRX_LOG(log_info, "Messag prepared ok, about to enqueue to [%s] Q...",
0251 p_msg->hdr.qname);
0252
0253 if (EXSUCCEED!=tmq_msg_add(&p_msg, EXFALSE, &qctl_out, int_diag))
0254 {
0255 NDRX_LOG(log_error, "tmq_enqueue: failed to enqueue!");
0256 userlog("tmq_enqueue: failed to enqueue!");
0257
0258
0259 if (EXSUCCEED==qctl_out.diagnostic)
0260 {
0261 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed to enqueue!");
0262 qctl_out.diagnostic = QMESYSTEM;
0263 }
0264
0265 EXFAIL_OUT(ret);
0266 }
0267
0268
0269 Bdel(p_ub, EX_DATA, 0);
0270
0271 out:
0272
0273 if (NULL!=data)
0274 {
0275 NDRX_FREE(data);
0276 }
0277
0278 if (EXSUCCEED!=ret && NULL!=p_msg)
0279 {
0280 NDRX_LOG(log_warn, "About to free p_msg!");
0281 NDRX_FREE(p_msg);
0282 }
0283
0284 if (local_tx)
0285 {
0286 if (EXSUCCEED!=ret)
0287 {
0288 NDRX_LOG(log_error, "Aborting transaction");
0289 tpabort(0);
0290 }
0291 else
0292 {
0293 NDRX_LOG(log_info, "Committing transaction!");
0294
0295 if (EXSUCCEED!=tpcommit(0))
0296 {
0297 NDRX_LOG(log_error, "Commit failed!");
0298 userlog("Commit failed!");
0299 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: commit failed!");
0300 qctl_out.diagnostic = QMESYSTEM;
0301 ret=EXFAIL;
0302 }
0303 }
0304 }
0305
0306
0307
0308
0309
0310 if (EXSUCCEED!=tmq_tpqctl_to_ubf_enqrsp(p_ub, &qctl_out))
0311 {
0312 NDRX_LOG(log_error, "tmq_enqueue: failed to generate response buffer!");
0313 userlog("tmq_enqueue: failed to generate response buffer!");
0314 ret=EXFAIL;
0315
0316 }
0317
0318 return ret;
0319 }
0320
0321
0322
0323
0324
0325
0326
0327 expublic int tmq_dequeue(UBFH **pp_ub, int *int_diag)
0328 {
0329
0330
0331
0332 int ret = EXSUCCEED;
0333 tmq_msg_t *p_msg = NULL;
0334 TPQCTL qctl_out, qctl_in;
0335 int local_tx = EXFALSE;
0336 char qname[TMQNAMELEN+1];
0337 long buf_realoc_size;
0338 char corrid_str[TMCORRIDLEN_STR+1];
0339 char *p_corrid_str = NULL;
0340
0341
0342 NDRX_LOG(log_debug, "Into tmq_dequeue()");
0343
0344 memset(&qctl_in, 0, sizeof(qctl_in));
0345 memset(&qctl_out, 0, sizeof(qctl_out));
0346
0347 ndrx_debug_dump_UBF(log_info, "tmq_dequeue called with", *pp_ub);
0348
0349 if (!M_is_xa_open)
0350 {
0351 if (EXSUCCEED!=tpopen())
0352 {
0353 NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s",
0354 tpstrerror(tperrno));
0355 userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0356 }
0357 else
0358 {
0359 M_is_xa_open = EXTRUE;
0360 }
0361 }
0362
0363 if (EXSUCCEED!=tmq_tpqctl_from_ubf_deqreq(*pp_ub, &qctl_in))
0364 {
0365 NDRX_LOG(log_error, "tmq_dequeue: failed to read request qctl!");
0366 userlog("tmq_dequeue: failed to read request qctl!");
0367 EXFAIL_OUT(ret);
0368 }
0369
0370 if (!tpgetlev())
0371 {
0372 NDRX_LOG(log_debug, "Not in global transaction, starting local...");
0373 if (EXSUCCEED!=tpbegin(G_tmqueue_cfg.dflt_timeout, 0))
0374 {
0375 NDRX_LOG(log_error, "Failed to start global tx!");
0376 userlog("Failed to start global tx!");
0377 }
0378 else
0379 {
0380 NDRX_LOG(log_debug, "Transaction started ok...");
0381 local_tx = EXTRUE;
0382 }
0383 }
0384
0385 if (EXSUCCEED!=Bget(*pp_ub, EX_QNAME, 0, qname, 0))
0386 {
0387 NDRX_LOG(log_error, "tmq_dequeue: failed to get EX_QNAME");
0388 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_dequeue: failed to get EX_QNAME!");
0389 qctl_out.diagnostic = QMEINVAL;
0390
0391 EXFAIL_OUT(ret);
0392 }
0393
0394
0395 NDRX_LOG(log_info, "qctl_req flags: %ld", qctl_in.flags);
0396
0397 if (qctl_in.flags & TPQGETBYMSGID)
0398 {
0399 if (NULL==(p_msg = tmq_msg_dequeue_by_msgid(qctl_in.msgid, qctl_in.flags,
0400 &qctl_out.diagnostic, qctl_out.diagmsg, sizeof(qctl_out.diagmsg), int_diag)))
0401 {
0402 char msgid_str[TMMSGIDLEN_STR+1];
0403 int lev = log_info;
0404 tmq_msgid_serialize(qctl_in.msgid, msgid_str);
0405
0406 if (qctl_out.diagnostic!=QMENOMSG)
0407 {
0408 lev=log_error;
0409 }
0410
0411 NDRX_LOG(lev, "tmq_dequeue: no message found for given msgid [%s] %ld: %s",
0412 msgid_str, qctl_out.diagnostic, qctl_out.diagmsg);
0413
0414 EXFAIL_OUT(ret);
0415 }
0416 }
0417 else
0418 {
0419
0420 if (qctl_in.flags & TPQGETBYCORRID)
0421 {
0422 tmq_msgid_serialize(qctl_in.corrid, corrid_str);
0423 p_corrid_str = corrid_str;
0424 }
0425
0426 if (NULL==(p_msg = tmq_msg_dequeue(qname, qctl_in.flags, EXFALSE,
0427 &qctl_out.diagnostic, qctl_out.diagmsg, sizeof(qctl_out.diagmsg),
0428 p_corrid_str, int_diag)))
0429 {
0430 int lev = log_info;
0431
0432 if (qctl_out.diagnostic!=QMENOMSG)
0433 {
0434 lev=log_error;
0435 }
0436
0437 NDRX_LOG(lev, "tmq_dequeue: no message in Q [%s] corrid_str [%s] %ld: %s", qname,
0438 NULL!=p_corrid_str?corrid_str:"N/A", qctl_out.diagnostic, qctl_out.diagmsg);
0439
0440 EXFAIL_OUT(ret);
0441 }
0442 }
0443
0444
0445 memcpy(&qctl_out, &p_msg->qctl, sizeof(qctl_out));
0446
0447 buf_realoc_size = Bused (*pp_ub) + p_msg->len + 1024;
0448
0449 if (NULL==(*pp_ub = (UBFH *)tprealloc ((char *)*pp_ub, buf_realoc_size)))
0450 {
0451 NDRX_LOG(log_error, "Failed to allocate buffer to size: %ld", buf_realoc_size);
0452 userlog("Failed to allocate buffer to size: %ld", buf_realoc_size);
0453 EXFAIL_OUT(ret);
0454 }
0455
0456 if (EXSUCCEED!=Bchg(*pp_ub, EX_DATA, 0, p_msg->msg, p_msg->len))
0457 {
0458 NDRX_LOG(log_error, "failed to set EX_DATA!");
0459 userlog("failed to set EX_DATA!");
0460
0461 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "failed to set EX_DATA!");
0462 qctl_out.diagnostic = QMESYSTEM;
0463
0464
0465 if (TPQPEEK & qctl_in.flags)
0466 {
0467 tmq_unlock_msg_by_msgid(p_msg->qctl.msgid, 0);
0468 }
0469
0470 EXFAIL_OUT(ret);
0471 }
0472
0473
0474 if (TPQPEEK & qctl_in.flags &&
0475 EXSUCCEED!=tmq_unlock_msg_by_msgid(p_msg->qctl.msgid, EXTRUE))
0476 {
0477 NDRX_LOG(log_error, "Failed to unlock msg!");
0478 EXFAIL_OUT(ret);
0479 }
0480
0481 out:
0482
0483 if (local_tx)
0484 {
0485 if (EXSUCCEED!=ret)
0486 {
0487 NDRX_LOG(log_error, "Aborting transaction");
0488 tpabort(0);
0489 }
0490 else
0491 {
0492 NDRX_LOG(log_info, "Committing transaction!");
0493 if (EXSUCCEED!=tpcommit(0))
0494 {
0495 NDRX_LOG(log_error, "Commit failed!");
0496 userlog("Commit failed!");
0497 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_dequeue: commit failed!");
0498 qctl_out.diagnostic = QMESYSTEM;
0499 ret=EXFAIL;
0500 }
0501 }
0502 }
0503
0504
0505
0506
0507
0508 if (EXSUCCEED!=tmq_tpqctl_to_ubf_deqrsp(*pp_ub, &qctl_out))
0509 {
0510 NDRX_LOG(log_error, "tmq_dequeue: failed to generate response buffer!");
0511 userlog("tmq_dequeue: failed to generate response buffer!");
0512 ret=EXFAIL;
0513 }
0514
0515 return ret;
0516 }
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526
0527 expublic int tmq_mqlq(UBFH *p_ub, int cd)
0528 {
0529 int ret = EXSUCCEED;
0530 long revent;
0531 fwd_qlist_t *el, *tmp, *list;
0532 short nodeid = G_tmqueue_cfg.vnodeid;
0533 short srvid = tpgetsrvid();
0534 char *fn = "tmq_printqueue";
0535
0536
0537 if (NULL==(list = tmq_get_qlist(EXFALSE, EXTRUE)))
0538 {
0539 NDRX_LOG(log_info, "%s: No queues found", fn);
0540 }
0541 else
0542 {
0543 NDRX_LOG(log_info, "%s: Queues found", fn);
0544 }
0545
0546 DL_FOREACH_SAFE(list,el,tmp)
0547 {
0548 long msgs = 0;
0549 long locked = 0;
0550
0551 tmq_get_q_stats(el->qname, &msgs, &locked);
0552
0553 NDRX_LOG(log_debug, "returning %s/%s", ndrx_G_qspace, el->qname);
0554
0555 if (EXSUCCEED!=Bchg(p_ub, EX_QSPACE, 0, ndrx_G_qspace, 0L) ||
0556 EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, el->qname, 0L) ||
0557 EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0558 EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0559 EXSUCCEED!=Bchg(p_ub, EX_QNUMMSG, 0, (char *)&msgs, 0L) ||
0560 EXSUCCEED!=Bchg(p_ub, EX_QNUMLOCKED, 0, (char *)&locked, 0L) ||
0561 EXSUCCEED!=Bchg(p_ub, EX_QNUMSUCCEED, 0, (char *)&el->succ, 0L) ||
0562 EXSUCCEED!=Bchg(p_ub, EX_QNUMFAIL, 0, (char *)&el->fail, 0L) ||
0563 EXSUCCEED!=Bchg(p_ub, EX_QNUMENQ, 0, (char *)&el->numenq, 0L) ||
0564 EXSUCCEED!=Bchg(p_ub, EX_QNUMDEQ, 0, (char *)&el->numdeq, 0L)
0565 )
0566 {
0567 NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0568 EXFAIL_OUT(ret);
0569 }
0570 if (EXFAIL == tpsend(cd,
0571 (char *)p_ub,
0572 0L,
0573 0,
0574 &revent))
0575 {
0576 NDRX_LOG(log_error, "Send data failed [%s] %ld",
0577 tpstrerror(tperrno), revent);
0578 EXFAIL_OUT(ret);
0579 }
0580 else
0581 {
0582 NDRX_LOG(log_debug,"sent ok");
0583 }
0584
0585 DL_DELETE(list, el);
0586 NDRX_FREE((char *)el);
0587 }
0588
0589 out:
0590
0591 return ret;
0592 }
0593
0594
0595
0596
0597
0598
0599
0600 expublic int tmq_mqlc(UBFH *p_ub, int cd)
0601 {
0602 int ret = EXSUCCEED;
0603 long revent;
0604 fwd_qlist_t *el, *tmp, *list;
0605 short nodeid = G_tmqueue_cfg.vnodeid;
0606 short srvid = tpgetsrvid();
0607 char *fn = "tmq_mqlc";
0608 char qdef[TMQ_QDEF_MAX];
0609 char flags[128];
0610 int is_default = EXFALSE;
0611
0612
0613 if (NULL==(list = tmq_get_qlist(EXFALSE, EXTRUE)))
0614 {
0615 NDRX_LOG(log_info, "%s: No queues found", fn);
0616 }
0617 else
0618 {
0619 NDRX_LOG(log_info, "%s: Queues found", fn);
0620 }
0621
0622 DL_FOREACH_SAFE(list,el,tmp)
0623 {
0624 is_default = EXFALSE;
0625
0626 if (EXSUCCEED==tmq_build_q_def(el->qname, &is_default, qdef, sizeof(qdef)))
0627 {
0628 NDRX_LOG(log_debug, "returning %s/%s", ndrx_G_qspace, el->qname);
0629
0630 flags[0] = EXEOS;
0631
0632 if (is_default)
0633 {
0634 NDRX_STRCAT_S(flags, sizeof(flags), "D");
0635 }
0636
0637
0638 if (EXSUCCEED!=Bchg(p_ub, EX_QSPACE, 0, ndrx_G_qspace, 0L) ||
0639 EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, el->qname, 0L) ||
0640 EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0641 EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0642 EXSUCCEED!=CBchg(p_ub, EX_DATA, 0, qdef, 0L, BFLD_STRING) ||
0643 EXSUCCEED!=Bchg(p_ub, EX_QSTRFLAGS, 0, flags, 0L)
0644 )
0645 {
0646 NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0647 EXFAIL_OUT(ret);
0648 }
0649 if (EXFAIL == tpsend(cd,
0650 (char *)p_ub,
0651 0L,
0652 0,
0653 &revent))
0654 {
0655 NDRX_LOG(log_error, "Send data failed [%s] %ld",
0656 tpstrerror(tperrno), revent);
0657 EXFAIL_OUT(ret);
0658 }
0659 else
0660 {
0661 NDRX_LOG(log_debug,"sent ok");
0662 }
0663 }
0664
0665 DL_DELETE(list, el);
0666 NDRX_FREE((char *)el);
0667 }
0668
0669 out:
0670
0671 return ret;
0672 }
0673
0674
0675
0676
0677
0678
0679
0680 expublic int tmq_mqlm(UBFH *p_ub, int cd)
0681 {
0682 int ret = EXSUCCEED;
0683 long revent;
0684 tmq_memmsg_t *el = NULL, *tmp = NULL, *list = NULL;
0685 short nodeid = G_tmqueue_cfg.vnodeid;
0686 short srvid = tpgetsrvid();
0687 char *fn = "tmq_mqlm";
0688 char qname[TMQNAMELEN+1];
0689 short is_locked;
0690 char msgid_str[TMMSGIDLEN_STR+1];
0691
0692
0693
0694 if (EXSUCCEED!=Bget(p_ub, EX_QNAME, 0, qname, 0L))
0695 {
0696 NDRX_LOG(log_error, "Failed to get qname");
0697 EXFAIL_OUT(ret);
0698 }
0699
0700 if (NULL==(list = tmq_get_msglist(qname)))
0701 {
0702 NDRX_LOG(log_info, "%s: no messages in q", fn);
0703 }
0704 else
0705 {
0706 NDRX_LOG(log_info, "%s: messages found", fn);
0707 }
0708
0709 DL_FOREACH_SAFE(list,el,tmp)
0710 {
0711 if (el->msg->lockthreadid)
0712 {
0713 is_locked = EXTRUE;
0714 }
0715 else
0716 {
0717 is_locked = EXFALSE;
0718 }
0719
0720 tmq_msgid_serialize(el->msg->hdr.msgid, msgid_str);
0721
0722 if (EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0723 EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0724 EXSUCCEED!=Bchg(p_ub, EX_QMSGIDSTR, 0, msgid_str, 0L) ||
0725 EXSUCCEED!=Bchg(p_ub, EX_TSTAMP1_STR, 0,
0726 ndrx_get_strtstamp2(0, el->msg->msgtstamp, el->msg->msgtstamp_usec), 0L) ||
0727 EXSUCCEED!=Bchg(p_ub, EX_TSTAMP2_STR, 0,
0728 ndrx_get_strtstamp2(1, el->msg->trytstamp, el->msg->trytstamp_usec), 0L) ||
0729 EXSUCCEED!=Bchg(p_ub, EX_QMSGTRIES, 0, (char *)&el->msg->trycounter, 0L) ||
0730 EXSUCCEED!=Bchg(p_ub, EX_QMSGLOCKED, 0, (char *)&is_locked, 0L)
0731 )
0732 {
0733 NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0734 EXFAIL_OUT(ret);
0735 }
0736
0737 if (EXFAIL == tpsend(cd,
0738 (char *)p_ub,
0739 0L,
0740 0,
0741 &revent))
0742 {
0743 NDRX_LOG(log_error, "Send data failed [%s] %ld",
0744 tpstrerror(tperrno), revent);
0745 EXFAIL_OUT(ret);
0746 }
0747 else
0748 {
0749 NDRX_LOG(log_debug,"sent ok");
0750 }
0751
0752
0753 }
0754
0755 out:
0756
0757 DL_FOREACH_SAFE(list,el,tmp)
0758 {
0759 DL_DELETE(list, el);
0760 NDRX_FREE((char *)el->msg);
0761 NDRX_FREE((char *)el);
0762 }
0763 return ret;
0764 }
0765
0766
0767
0768
0769
0770
0771 expublic int tmq_mqrc(UBFH *p_ub)
0772 {
0773 int ret = EXSUCCEED;
0774
0775
0776 if (ndrx_get_G_cconfig())
0777 {
0778 ndrx_cconfig_reload();
0779 }
0780
0781 ret = tmq_reload_conf(G_tmqueue_cfg.qconfig);
0782
0783 out:
0784 return ret;
0785 }
0786
0787
0788
0789
0790
0791
0792 expublic int tmq_mqch(UBFH *p_ub)
0793 {
0794 int ret = EXSUCCEED;
0795 char conf[512];
0796 BFLDLEN len = sizeof(conf);
0797
0798 if (EXSUCCEED!=CBget(p_ub, EX_DATA, 0, conf, &len, BFLD_STRING))
0799 {
0800 NDRX_LOG(log_error, "Failed to get EX_DATA!");
0801 EXFAIL_OUT(ret);
0802 }
0803
0804 ret = tmq_qconf_addupd(conf, NULL);
0805
0806 out:
0807 return ret;
0808 }
0809