0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <string.h>
0041 #include <errno.h>
0042 #include <regex.h>
0043 #include <utlist.h>
0044 #include <dirent.h>
0045 #include <pthread.h>
0046 #include <signal.h>
0047
0048 #include <ndebug.h>
0049 #include <atmi.h>
0050 #include <atmi_int.h>
0051 #include <typed_buf.h>
0052 #include <ndrstandard.h>
0053 #include <ubf.h>
0054 #include <Exfields.h>
0055 #include <tperror.h>
0056 #include <exnet.h>
0057 #include <ndrxdcmn.h>
0058
0059 #include "tmqd.h"
0060 #include "../libatmisrv/srv_int.h"
0061 #include "nstdutil.h"
0062 #include "userlog.h"
0063 #include <xa_cmn.h>
0064 #include <atmi_int.h>
0065 #include <ndrxdiag.h>
0066 #include "qtran.h"
0067 #include <atmi_tls.h>
0068 #include <ndrx_ddr.h>
0069 #include <assert.h>
0070
0071
0072
0073
0074
0075 expublic pthread_t G_forward_thread;
0076 expublic int volatile G_forward_req_shutdown = EXFALSE;
0077 expublic int volatile ndrx_G_forward_req_shutdown_ack = EXFALSE;
0078
0079
0080 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0081 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0082
0083 exprivate __thread int M_is_xa_open = EXFALSE;
0084
0085
0086 exprivate fwd_qlist_t *M_next_fwd_q_list = NULL;
0087 exprivate fwd_qlist_t *M_next_fwd_q_cur = NULL;
0088 exprivate int M_had_msg = EXFALSE;
0089 exprivate int M_any_busy = EXFALSE;
0090 exprivate int M_num_busy = 0;
0091
0092 exprivate MUTEX_LOCKDECL(M_forward_lock);
0093
0094 exprivate int M_force_sleep = EXFALSE;
0095
0096
0097
0098 expublic int volatile ndrx_G_fwd_into_sleep=EXFALSE;
0099
0100
0101 expublic int volatile ndrx_G_fwd_into_poolsleep=EXFALSE;
0102
0103
0104 expublic int volatile ndrx_G_fwd_force_wake = EXFALSE;
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114 expublic void ndrx_forward_chkrun(tmq_memmsg_t *mmsg)
0115 {
0116 fwd_stats_t *p_stats;
0117
0118
0119 if (G_tmqueue_cfg.no_chkrun)
0120 {
0121 return;
0122 }
0123
0124
0125 if (ndrx_G_fwd_force_wake)
0126 {
0127 return;
0128 }
0129
0130
0131 if (!ndrx_G_fwd_into_sleep && !ndrx_G_fwd_into_poolsleep)
0132 {
0133 return;
0134 }
0135
0136 if (G_forward_req_shutdown)
0137 {
0138 return;
0139 }
0140
0141
0142 if (!TMQ_AUTOQ_ISAUTO(mmsg->qconf->autoq))
0143 {
0144 return;
0145 }
0146
0147
0148
0149
0150
0151 if ( (mmsg->qstate & NDRX_TMQ_LOC_CURQ
0152
0153 || mmsg->qstate & NDRX_TMQ_LOC_FUTQ && (mmsg->msg->qctl.flags &TPQTIME_ABS)
0154 && mmsg->msg->qctl.deq_time <= time(NULL) )
0155
0156 &&mmsg->qconf->workers > tmq_fwd_busy_cnt(mmsg->msg->hdr.qname, &p_stats))
0157 {
0158
0159 ndrx_G_fwd_force_wake=EXTRUE;
0160
0161 if (ndrx_G_fwd_into_sleep)
0162 {
0163
0164 pthread_cond_signal(&M_wait_cond);
0165 }
0166 else if (ndrx_G_fwd_into_poolsleep)
0167 {
0168
0169 ndrx_thpool_signal_one(G_tmqueue_cfg.fwdthpool);
0170 }
0171 }
0172 }
0173
0174
0175
0176
0177 expublic void forward_lock(void)
0178 {
0179 MUTEX_LOCK_V(M_forward_lock);
0180 }
0181
0182
0183
0184
0185 expublic void forward_unlock(void)
0186 {
0187 MUTEX_UNLOCK_V(M_forward_lock);
0188 }
0189
0190
0191
0192
0193
0194 exprivate void thread_sleep(int sleep_sec)
0195 {
0196 struct timespec wait_time;
0197 struct timeval now;
0198 int rt;
0199
0200 gettimeofday(&now,NULL);
0201
0202 wait_time.tv_sec = now.tv_sec+sleep_sec;
0203 wait_time.tv_nsec = now.tv_usec*1000;
0204
0205 MUTEX_LOCK_V(M_wait_mutex);
0206
0207
0208 if (!G_forward_req_shutdown)
0209 {
0210 rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0211 }
0212
0213 MUTEX_UNLOCK_V(M_wait_mutex);
0214 }
0215
0216
0217
0218
0219 expublic void forward_shutdown_wake(void)
0220 {
0221 MUTEX_LOCK_V(M_wait_mutex);
0222 pthread_cond_signal(&M_wait_cond);
0223 MUTEX_UNLOCK_V(M_wait_mutex);
0224 }
0225
0226
0227
0228
0229 exprivate void fwd_q_list_rm(void)
0230 {
0231 fwd_qlist_t *elt, *tmp;
0232
0233 if (NULL!=M_next_fwd_q_list)
0234 {
0235 DL_FOREACH_SAFE(M_next_fwd_q_list,elt,tmp)
0236 {
0237 DL_DELETE(M_next_fwd_q_list,elt);
0238 NDRX_FREE(elt);
0239 }
0240 }
0241 }
0242
0243
0244
0245
0246
0247
0248
0249
0250 exprivate fwd_msg_t * get_next_msg(void)
0251 {
0252 fwd_msg_t * ret = NULL;
0253 tmq_msg_t * ret_msg = NULL;
0254 long qerr = EXSUCCEED;
0255 char msgbuf[128];
0256 int again;
0257 static unsigned long seq = 0;
0258
0259 do
0260 {
0261 again=EXFALSE;
0262
0263 if (NULL==M_next_fwd_q_list || NULL == M_next_fwd_q_cur)
0264 {
0265
0266 M_had_msg=EXFALSE;
0267 M_any_busy=EXFALSE;
0268 M_num_busy = 0;
0269 fwd_q_list_rm();
0270
0271
0272 M_next_fwd_q_list = tmq_get_qlist(EXTRUE, EXFALSE);
0273
0274 if (NULL!=M_next_fwd_q_list)
0275 {
0276 M_next_fwd_q_cur = M_next_fwd_q_list;
0277 }
0278 }
0279
0280
0281
0282
0283 while (NULL!=M_next_fwd_q_cur)
0284 {
0285 fwd_stats_t *p_stats;
0286 fwd_qlist_t *q_cur = M_next_fwd_q_cur;
0287
0288 int busy = tmq_fwd_busy_cnt(M_next_fwd_q_cur->qname, &p_stats);
0289
0290 if (EXFAIL==busy)
0291 {
0292 NDRX_LOG(log_error, "Failed to get stats for [%s] - memory error",
0293 M_next_fwd_q_cur->qname);
0294 userlog("Failed to get stats for [%s] - memory error",
0295 M_next_fwd_q_cur->qname);
0296
0297 exit(-1);
0298 }
0299
0300 NDRX_LOG(log_info, "mon: %s %ld/%ld/%d/%d",
0301 M_next_fwd_q_cur->qname
0302 ,M_next_fwd_q_cur->numenq
0303 ,M_next_fwd_q_cur->numdeq
0304 ,busy
0305 , M_next_fwd_q_cur->workers);
0306
0307
0308 ret=NULL;
0309 ret_msg=NULL;
0310
0311
0312 M_num_busy+=busy;
0313
0314 if (busy >= M_next_fwd_q_cur->workers)
0315 {
0316
0317 M_any_busy=EXTRUE;
0318 }
0319
0320 else if (NULL==(ret_msg=tmq_msg_dequeue(M_next_fwd_q_cur->qname, 0, EXTRUE,
0321 &qerr, msgbuf, sizeof(msgbuf), NULL, NULL)))
0322 {
0323 NDRX_LOG(log_debug, "Not messages for dequeue qerr=%ld: %s",
0324 qerr, msgbuf);
0325 }
0326 else
0327 {
0328 NDRX_LOG(log_debug, "Dequeued message");
0329 M_had_msg=EXTRUE;
0330 }
0331
0332 M_next_fwd_q_cur = M_next_fwd_q_cur->next;
0333
0334
0335
0336
0337 if (NULL!=ret_msg)
0338 {
0339 ret = NDRX_FPMALLOC(sizeof(fwd_msg_t), 0);
0340
0341 if (NULL==ret)
0342 {
0343 int err = errno;
0344 NDRX_LOG(log_error, "Failed to malloc %d bytes: %s - termiante",
0345 sizeof(fwd_msg_t), strerror(err));
0346 userlog("Failed to malloc %d bytes: %s - terminate",
0347 sizeof(fwd_msg_t), strerror(err));
0348 exit(-1);
0349 }
0350 ret->msg=ret_msg;
0351 ret->stats=p_stats;
0352 ret->sync=q_cur->sync;
0353 seq++;
0354
0355 ret->seq = seq;
0356
0357
0358 if (ret->sync)
0359 {
0360 tmq_fwd_sync_add(ret);
0361 }
0362
0363 break;
0364 }
0365
0366 }
0367
0368
0369 if (NULL==ret)
0370 {
0371
0372
0373
0374
0375 if (M_had_msg)
0376 {
0377 NDRX_LOG(log_debug, "Had messages in previous run, scan Qs again");
0378 again = EXTRUE;
0379 }
0380 else if (M_any_busy)
0381 {
0382 NDRX_LOG(log_debug, "All Qs/threads busy to the limit wait for slot...");
0383
0384
0385
0386
0387
0388
0389
0390
0391
0392
0393
0394
0395
0396
0397
0398
0399 ndrx_G_fwd_into_poolsleep=EXTRUE;
0400
0401 ndrx_thpool_timedwait_less(G_tmqueue_cfg.fwdthpool,
0402 M_num_busy, G_tmqueue_cfg.scan_time*1000, (int *)&ndrx_G_fwd_force_wake);
0403
0404
0405 ndrx_G_fwd_into_poolsleep=EXFALSE;
0406 ndrx_G_fwd_force_wake=EXFALSE;
0407
0408 again = EXTRUE;
0409 }
0410 }
0411
0412
0413 } while (again && !G_forward_req_shutdown && !M_force_sleep);
0414
0415 out:
0416 return ret;
0417 }
0418
0419
0420 #define RELOCK do {\
0421 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, NULL);\
0422 \
0423 if (NULL==p_tl)\
0424 {\
0425 NDRX_LOG(log_error, "Fatal error ! Expected to have transaction for [%s]",\
0426 tmxid);\
0427 userlog("Fatal error ! Expected to have transaction for [%s]",\
0428 tmxid);\
0429 EXFAIL_OUT(ret);\
0430 }\
0431 } while (0)
0432
0433
0434 #define UNLOCK do {\
0435 tmq_log_unlock(p_tl);\
0436 p_tl=NULL;\
0437 } while (0)
0438
0439
0440
0441
0442
0443 #define GET_TL do {\
0444 \
0445 tmxid = G_atmi_tls->qdisk_tls->filename_base;\
0446 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, NULL);\
0447 \
0448 if (NULL==p_tl)\
0449 {\
0450 NDRX_LOG(log_error, "Fatal error ! Expected to have transaction for [%s]",\
0451 tmxid);\
0452 userlog("Fatal error ! Expected to have transaction for [%s]",\
0453 tmxid);\
0454 EXFAIL_OUT(ret);\
0455 }\
0456 } while (0)
0457
0458
0459
0460
0461 #define WRITE_DEL do {\
0462 \
0463 cmd_block.hdr.command_code = TMQ_STORCMD_DEL;\
0464
0465
0466
0467
0468 \
0469 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block, \
0470 "Removing completed message...", NULL, NULL))\
0471 {\
0472 NDRX_LOG(log_error, "Failed to issue complete/remove command to xa for msgid_str [%s]", \
0473 msgid_str);\
0474 userlog("Failed to issue complete/remove command to xa for msgid_str [%s]", \
0475 msgid_str);\
0476
0477
0478 \
0479 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);\
0480 EXFAIL_OUT(ret);\
0481 }\
0482 } while (0)
0483
0484
0485
0486
0487
0488 expublic void thread_process_forward (void *ptr, int *p_finish_off)
0489 {
0490 int ret = EXSUCCEED;
0491 fwd_msg_t * fwd = (fwd_msg_t *)ptr;
0492 tmq_msg_t * msg = fwd->msg;
0493 tmq_qconfig_t qconf;
0494 char *call_buf = NULL;
0495 long call_len = 0;
0496
0497 char *rply_buf = NULL;
0498 long rply_len = 0;
0499
0500 typed_buffer_descr_t *descr;
0501 char msgid_str[TMMSGIDLEN_STR+1];
0502 char *fn = "thread_process_forward";
0503 int tperr;
0504 union tmq_block cmd_block;
0505 int tout, tout_autotran;
0506 int sent_ok=EXFALSE;
0507 char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0508 char qname[TMQNAMELEN+1];
0509 char *tmxid=NULL;
0510 int cd;
0511 int msg_released = EXFALSE;
0512 qtran_log_t *p_tl=NULL;
0513 int autotran=EXFALSE;
0514 unsigned long trantime=0;
0515
0516 if (!M_is_xa_open)
0517 {
0518 if (EXSUCCEED!=tpopen())
0519 {
0520 NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s",
0521 tpstrerror(tperrno));
0522 userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0523
0524
0525 exit(1);
0526 }
0527 else
0528 {
0529 M_is_xa_open = EXTRUE;
0530 }
0531 }
0532
0533
0534 NDRX_STRCPY_SAFE(qname, msg->hdr.qname);
0535 tmq_msgid_serialize(msg->hdr.msgid, msgid_str);
0536
0537 NDRX_LOG(log_info, "%s enter for msgid_str: [%s]", fn, msgid_str);
0538
0539
0540
0541
0542 if (EXSUCCEED!=tmq_qconf_get_with_default_static(msg->hdr.qname, &qconf))
0543 {
0544
0545 NDRX_LOG(log_error, "Failed to get qconf for [%s]", msg->hdr.qname);
0546 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0547 EXFAIL_OUT(ret);
0548 }
0549
0550 if (qconf.txtout > EXFAIL)
0551 {
0552 tout_autotran = tout = qconf.txtout;
0553 NDRX_LOG(log_info, "txtout set to %d sec", tout);
0554 }
0555 else
0556 {
0557 tout_autotran = tout = G_tmqueue_cfg.dflt_timeout;
0558 NDRX_LOG(log_info, "txtout defaulted to %d sec", tout);
0559 }
0560
0561
0562
0563 if (0==strcmp(qconf.svcnm, TMQ_QUEUE_SERVICE))
0564 {
0565 NDRX_STRCPY_SAFE(svcnm, qconf.qname);
0566 }
0567 else
0568 {
0569 NDRX_STRCPY_SAFE(svcnm, qconf.svcnm);
0570 }
0571
0572
0573
0574
0575 if (TMQ_AUTOQ_AUTOTX!=qconf.autoq)
0576 {
0577 int ddr_ret = ndrx_ddr_service_get(svcnm, &autotran, &trantime);
0578
0579 if (EXFAIL==ddr_ret)
0580 {
0581 NDRX_LOG(log_always, "Service info failed [%s]", svcnm);
0582 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0583 EXFAIL_OUT(ret);
0584 }
0585 else if (autotran && trantime > 0)
0586 {
0587
0588 tout_autotran = trantime;
0589 NDRX_LOG(log_debug, "autoq=y, svc [%s] uses auto-tran tout: %d",
0590 svcnm, tout_autotran);
0591 }
0592 }
0593
0594
0595
0596
0597
0598
0599
0600
0601
0602 if (EXSUCCEED!=ndrx_mbuf_prepare_incoming(msg->msg,
0603 msg->len,
0604 &call_buf,
0605 &call_len,
0606 0, 0))
0607 {
0608 NDRX_LOG(log_always, "Failed to allocate buffer");
0609 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0610 EXFAIL_OUT(ret);
0611 }
0612
0613 memset(&cmd_block, 0, sizeof(cmd_block));
0614 memcpy(&cmd_block.hdr, &msg->hdr, sizeof(cmd_block.hdr));
0615
0616 if (TMQ_AUTOQ_AUTOTX==qconf.autoq)
0617 {
0618 NDRX_LOG(log_debug, "Service invocation shall be performed in "
0619 "transactional mode...");
0620
0621
0622 assert(EXSUCCEED==tpsblktime(tout, TPBLK_ALL));
0623
0624 if (EXSUCCEED!=tpbegin(tout, 0))
0625 {
0626 userlog("Failed to start tran: %s", tpstrerror(tperrno));
0627 NDRX_LOG(log_error, "Failed to start tran!");
0628
0629
0630 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0631 EXFAIL_OUT(ret);
0632 }
0633
0634
0635
0636
0637
0638 WRITE_DEL;
0639 GET_TL;
0640
0641 p_tl->cmds->no_unlock=EXTRUE;
0642 UNLOCK;
0643 }
0644 else
0645 {
0646
0647 assert(EXSUCCEED==tpsblktime(tout_autotran, TPBLK_ALL));
0648 }
0649
0650
0651
0652
0653
0654
0655
0656 if (fwd->sync)
0657 {
0658 tmq_fwd_sync_wait(fwd);
0659 }
0660
0661 NDRX_LOG(log_info, "Sending request to service: [%s] sync_seq=%lu", svcnm, fwd->seq);
0662
0663 cd = tpacall (svcnm, call_buf, call_len, 0);
0664
0665
0666 if (TMQ_SYNC_TPACALL==fwd->sync)
0667 {
0668 tmq_fwd_sync_notify(fwd);
0669 NDRX_LOG(log_debug, "Sync notified (tpacall) sync_seq=%lu", fwd->seq);
0670 msg_released = EXTRUE;
0671 }
0672
0673
0674 if (EXFAIL==cd || EXFAIL==tpgetrply (&cd, (char **)&rply_buf, &rply_len, 0))
0675 {
0676 tperr = tperrno;
0677 NDRX_LOG(log_error, "%s failed: %s", svcnm, tpstrerror(tperr));
0678
0679
0680
0681
0682
0683 if (tpgetlev())
0684 {
0685 NDRX_LOG(log_error, "Abort current transaction for counter increment");
0686 tpabort(0L);
0687 }
0688 }
0689 else
0690 {
0691 sent_ok=EXTRUE;
0692 }
0693
0694 NDRX_LOG(log_info, "Service answer %s for %s", (sent_ok?"ok":"fail"), msgid_str);
0695
0696
0697
0698
0699 if (tout!=tout_autotran)
0700 {
0701 assert(EXSUCCEED==tpsblktime(tout, TPBLK_ALL));
0702 }
0703
0704
0705
0706
0707 if (!tpgetlev())
0708 {
0709 if (EXSUCCEED!=tpbegin(tout, 0))
0710 {
0711 userlog("Failed to start tran: %s", tpstrerror(tperrno));
0712 NDRX_LOG(log_error, "Failed to start tran!");
0713 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0714 EXFAIL_OUT(ret);
0715 }
0716 }
0717
0718
0719
0720
0721 if (sent_ok)
0722 {
0723
0724
0725 if (TMQ_AUTOQ_AUTOTX==qconf.autoq)
0726 {
0727
0728 GET_TL;
0729 p_tl->cmds->no_unlock=EXFALSE;
0730 }
0731 else
0732 {
0733 WRITE_DEL;
0734 GET_TL;
0735 }
0736
0737 tmq_update_q_stats(msg->hdr.qname, 1, 0);
0738
0739
0740 if (msg->qctl.flags & TPQREPLYQ)
0741 {
0742 TPQCTL ctl;
0743
0744 NDRX_LOG(log_warn, "TPQREPLYQ defined, sending answer buffer to "
0745 "[%s] q in [%s] namespace",
0746 msg->qctl.replyqueue, msg->hdr.qspace);
0747
0748
0749 memset(&ctl, 0, sizeof(ctl));
0750
0751
0752
0753
0754
0755
0756
0757
0758 UNLOCK;
0759 ret = tpenqueue (msg->hdr.qspace, msg->qctl.replyqueue, &ctl, rply_buf, rply_len, 0);
0760 RELOCK;
0761
0762 if (EXSUCCEED!=ret)
0763 {
0764 if (TPEDIAGNOSTIC==tperrno)
0765 {
0766 NDRX_LOG(log_error, "Failed to enqueue to replyqueue [%s]: %s diag: %d:%s",
0767 msg->qctl.replyqueue, tpstrerror(tperrno),
0768 msg->qctl.diagnostic, msg->qctl.diagmsg);
0769 userlog("Failed to enqueue to replyqueue [%s]: %s diag: %d:%s",
0770 msg->qctl.replyqueue, tpstrerror(tperrno),
0771 msg->qctl.diagnostic, msg->qctl.diagmsg);
0772 }
0773 else
0774 {
0775 NDRX_LOG(log_error, "Failed to enqueue to replyqueue [%s]: %s",
0776 msg->qctl.replyqueue, tpstrerror(tperrno));
0777 userlog("Failed to enqueue to replyqueue [%s]: %s",
0778 msg->qctl.replyqueue, tpstrerror(tperrno));
0779 }
0780
0781
0782
0783 EXFAIL_OUT(ret);
0784 }
0785 }
0786
0787 }
0788 else
0789 {
0790
0791 msg->trycounter++;
0792 NDRX_LOG(log_warn, "Message [%s] tries %ld, max: %d",
0793 msgid_str, msg->trycounter, qconf.tries);
0794 ndrx_utc_tstamp2(&msg->trytstamp, &msg->trytstamp_usec);
0795
0796 if (msg->trycounter>=qconf.tries)
0797 {
0798 NDRX_LOG(log_error, "Message [%s] expired", msgid_str);
0799
0800
0801 tmq_update_q_stats(msg->hdr.qname, 0, 1);
0802 cmd_block.hdr.command_code = TMQ_STORCMD_DEL;
0803 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block,
0804 "Removing expired message...", NULL, NULL))
0805 {
0806 NDRX_LOG(log_error, "Failed to issue complete/remove command to xa for msgid_str [%s]",
0807 msgid_str);
0808 userlog("Failed to issue complete/remove command to xa for msgid_str [%s]",
0809 msgid_str);
0810
0811
0812
0813
0814 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0815 EXFAIL_OUT(ret);
0816 }
0817
0818
0819 GET_TL;
0820
0821 if (msg->qctl.flags & TPQFAILUREQ && NULL!=rply_buf)
0822 {
0823 TPQCTL ctl;
0824 NDRX_LOG(log_warn, "TPQFAILUREQ defined and non NULL reply, enqueue answer buffer to "
0825 "[%s] q in [%s] namespace",
0826 msg->qctl.failurequeue, msg->hdr.qspace);
0827
0828
0829
0830
0831 memcpy(&ctl, &msg->qctl, sizeof(ctl));
0832
0833
0834
0835
0836 UNLOCK;
0837 ret = tpenqueue (msg->hdr.qspace, msg->qctl.failurequeue, &ctl, rply_buf, rply_len, 0);
0838 RELOCK;
0839
0840 if (EXSUCCEED!=ret)
0841 {
0842 if (TPEDIAGNOSTIC==tperrno)
0843 {
0844 NDRX_LOG(log_error, "Failed to enqueue to failurequeue [%s]: %s diag: %d:%s",
0845 msg->qctl.replyqueue, tpstrerror(tperrno),
0846 msg->qctl.diagnostic, msg->qctl.diagmsg);
0847 userlog("Failed to enqueue to failurequeue [%s]: %s diag: %d:%s",
0848 msg->qctl.replyqueue, tpstrerror(tperrno),
0849 msg->qctl.diagnostic, msg->qctl.diagmsg);
0850 }
0851 else
0852 {
0853 NDRX_LOG(log_error, "Failed to enqueue to failurequeue [%s]: %s",
0854 msg->qctl.replyqueue, tpstrerror(tperrno));
0855 userlog("Failed to enqueue to failurequeue [%s]: %s",
0856 msg->qctl.replyqueue, tpstrerror(tperrno));
0857 }
0858
0859
0860
0861
0862
0863
0864 EXFAIL_OUT(ret);
0865 }
0866 }
0867
0868 if (EXEOS!=qconf.errorq[0])
0869 {
0870 TPQCTL ctl;
0871 NDRX_LOG(log_warn, "ERRORQ defined, enqueue request buffer to "
0872 "[%s] q in [%s] namespace", qconf.errorq, msg->hdr.qspace);
0873
0874
0875
0876
0877 memcpy(&ctl, &msg->qctl, sizeof(ctl));
0878
0879 UNLOCK;
0880 ret = tpenqueue (msg->hdr.qspace, qconf.errorq, &ctl, call_buf, call_len, 0);
0881 RELOCK;
0882
0883 if (EXSUCCEED!=ret)
0884 {
0885 NDRX_LOG(log_error, "Failed to enqueue to errorq [%s]: %s",
0886 qconf.errorq, tpstrerror(tperrno));
0887 userlog("Failed to enqueue to errorq [%s]: %s",
0888 qconf.errorq, tpstrerror(tperrno));
0889
0890
0891 EXFAIL_OUT(ret);
0892 }
0893 }
0894 }
0895 else
0896 {
0897
0898 msg->qctl.flags |= TPQTIME_ABS;
0899 if ( 0 == msg->trycounter )
0900 {
0901 msg->qctl.deq_time = time(NULL) + qconf.waitinit;
0902 }
0903 else
0904 {
0905 int retry_inc = qconf.waitretry * msg->trycounter;
0906 if ( retry_inc > qconf.waitretrymax )
0907 {
0908 retry_inc = qconf.waitretrymax;
0909 }
0910 msg->qctl.deq_time = time(NULL) + retry_inc;
0911 }
0912
0913
0914 UPD_MSG((&cmd_block.upd), msg);
0915
0916 cmd_block.hdr.command_code = TMQ_STORCMD_UPD;
0917
0918 if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block,
0919 "Update message command", NULL, NULL))
0920 {
0921 NDRX_LOG(log_error, "Failed to issue update command to xa for msgid_str [%s]",
0922 msgid_str);
0923 userlog("Failed to issue update command to xa for msgid_str [%s]",
0924 msgid_str);
0925
0926
0927
0928
0929 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0930 EXFAIL_OUT(ret);
0931 }
0932
0933
0934 GET_TL;
0935
0936 }
0937 }
0938
0939 out:
0940
0941
0942 if (NULL!=p_tl)
0943 {
0944 tmq_log_unlock(p_tl);
0945 }
0946
0947
0948
0949
0950 if (tpgetlev())
0951 {
0952 if (EXSUCCEED==ret)
0953 {
0954 if (EXSUCCEED!=tpcommit(0L))
0955 {
0956 NDRX_LOG(log_error, "Failed to commit => aborting + force sleep");
0957 userlog("Failed to commit => aborting + force sleep");
0958 M_force_sleep=EXTRUE;
0959 tpabort(0L);
0960 }
0961 }
0962 else
0963 {
0964 NDRX_LOG(log_error, "System failure during msg processing => aborting + force sleep");
0965 userlog("System failure during msg processing => aborting + force sleep");
0966 tpabort(0L);
0967 M_force_sleep=EXTRUE;
0968 }
0969 }
0970 else if (EXSUCCEED!=ret)
0971 {
0972 NDRX_LOG(log_error, "System failure => force sleep");
0973 M_force_sleep=EXTRUE;
0974 }
0975
0976
0977 if (fwd->sync && !msg_released)
0978 {
0979 tmq_fwd_sync_notify(fwd);
0980 NDRX_LOG(log_debug, "Sync notified (tpcommit) sync_seq=%lu", fwd->seq);
0981 }
0982
0983 if (NULL!=call_buf)
0984 {
0985 tpfree(call_buf);
0986 }
0987
0988 if (NULL!=rply_buf)
0989 {
0990 tpfree(rply_buf);
0991 }
0992
0993
0994 tmq_fwd_busy_dec(fwd->stats);
0995
0996 NDRX_FPFREE(fwd);
0997
0998
0999 assert(EXSUCCEED==tpsblktime(0, TPBLK_ALL));
1000
1001 return;
1002 }
1003
1004
1005
1006
1007
1008
1009 expublic int forward_loop(void)
1010 {
1011 int ret = EXSUCCEED;
1012 int normal_sleep;
1013 fwd_msg_t * fwd_msg;
1014
1015
1016
1017
1018
1019 while(!G_forward_req_shutdown)
1020 {
1021 fwd_msg = NULL;
1022
1023
1024 ndrx_thpool_wait_one(G_tmqueue_cfg.fwdthpool);
1025
1026 normal_sleep=EXFALSE;
1027 if (!M_force_sleep)
1028 {
1029
1030 fwd_msg = get_next_msg();
1031
1032
1033 if (NULL!=fwd_msg)
1034 {
1035
1036 tmq_fwd_busy_inc(fwd_msg->stats);
1037 ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void*)thread_process_forward, (void *)fwd_msg);
1038 }
1039 else
1040 {
1041 normal_sleep=EXTRUE;
1042 }
1043 }
1044
1045
1046 if (normal_sleep || M_force_sleep)
1047 {
1048
1049
1050
1051 NDRX_LOG(log_debug, "background - sleep %d forced=%d",
1052 G_tmqueue_cfg.scan_time, M_force_sleep);
1053
1054 if (!M_force_sleep)
1055 {
1056 ndrx_G_fwd_into_sleep=EXTRUE;
1057 }
1058
1059 thread_sleep(G_tmqueue_cfg.scan_time);
1060
1061
1062 M_force_sleep=EXFALSE;
1063 ndrx_G_fwd_into_sleep=EXFALSE;
1064
1065 ndrx_G_fwd_force_wake=EXFALSE;
1066 }
1067 }
1068
1069
1070 ndrx_G_forward_req_shutdown_ack = EXTRUE;
1071
1072
1073 fwd_q_list_rm();
1074
1075 out:
1076 return ret;
1077 }
1078
1079
1080
1081
1082
1083 expublic void * forward_process(void *arg)
1084 {
1085 NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
1086
1087 tmq_thread_init();
1088 forward_loop();
1089 tmq_thread_uninit();
1090
1091 NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
1092
1093 return NULL;
1094 }
1095
1096
1097
1098
1099
1100 expublic int forward_process_init(void)
1101 {
1102 int ret = EXSUCCEED;
1103
1104 pthread_attr_t pthread_custom_attr;
1105 pthread_attr_init(&pthread_custom_attr);
1106
1107
1108 ndrx_platf_stack_set(&pthread_custom_attr);
1109 if (EXSUCCEED!=pthread_create(&G_forward_thread, &pthread_custom_attr,
1110 forward_process, NULL))
1111 {
1112 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "forward_process_init");
1113 EXFAIL_OUT(ret);
1114 }
1115 out:
1116 return ret;
1117 }
1118