0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <ndrx_config.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <string.h>
0038 #include <unistd.h> /* for getopt */
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042
0043 #include <ndebug.h>
0044 #include <atmi.h>
0045 #include <atmi_int.h>
0046 #include <typed_buf.h>
0047 #include <ndrstandard.h>
0048 #include <ubf.h>
0049 #include <Exfields.h>
0050
0051 #include <exnet.h>
0052 #include <ndrxdcmn.h>
0053
0054 #include "tmqd.h"
0055 #include "tperror.h"
0056 #include "userlog.h"
0057 #include <xa_cmn.h>
0058 #include <exthpool.h>
0059 #include "qcommon.h"
0060 #include "cconfig.h"
0061 #include <ubfutil.h>
0062 #include <thlock.h>
0063 #include "qtran.h"
0064 #include "../libatmisrv/srv_int.h"
0065
0066
0067
0068
0069
0070 expublic tmqueue_cfg_t G_tmqueue_cfg;
0071
0072 exprivate int M_init_ok = EXFALSE;
0073 exprivate __thread int M_thread_first = EXTRUE;
0074
0075
0076 exprivate int volatile M_into_toutchk = EXFALSE;
0077 exprivate MUTEX_LOCKDECL(M_into_toutchk_lock);
0078
0079
0080 exprivate int M_shutdown_ok = EXFALSE;
0081
0082
0083 exprivate int *M_shutdown_ind = NULL;
0084
0085
0086
0087
0088
0089 exprivate int tx_tout_check(void);
0090 exprivate void tm_chk_one_free_thread(void *ptr, int *p_finish_off);
0091 exprivate void tm_chk_one_free_thread_notif(void *ptr, int *p_finish_off);
0092
0093
0094
0095
0096 expublic void tmq_thread_init(void)
0097 {
0098 if (EXSUCCEED!=tpinit(NULL))
0099 {
0100 NDRX_LOG(log_error, "Failed to init worker client");
0101 userlog("tmsrv: Failed to init worker client");
0102 exit(1);
0103 }
0104
0105 if (EXSUCCEED!=tpopen())
0106 {
0107 NDRX_LOG(log_error, "Worker thread failed to tpopen() - nothing to do, "
0108 "process will exit");
0109 userlog("Worker thread failed to tpopen() - nothing to do, "
0110 "process will exit");
0111 exit(1);
0112 }
0113
0114 }
0115
0116
0117
0118
0119 expublic void tmq_thread_uninit(void)
0120 {
0121 NDRX_LOG(log_debug, "Into tmq_thread_uninit");
0122 tpclose();
0123 tpterm();
0124 }
0125
0126
0127
0128
0129
0130 void TMQUEUE_TH (void *ptr, int *p_finish_off)
0131 {
0132
0133
0134
0135 int ret=EXSUCCEED;
0136 ndrx_thread_server_t *thread_data = (ndrx_thread_server_t *)ptr;
0137 char cmd = EXEOS;
0138 int cd;
0139 int int_diag = 0;
0140
0141
0142
0143
0144 UBFH *p_ub = (UBFH *)thread_data->buffer;
0145
0146
0147 if (M_thread_first)
0148 {
0149 tmq_thread_init();
0150 M_thread_first = EXFALSE;
0151 }
0152
0153
0154 if (EXSUCCEED!=tpsrvsetctxdata(thread_data->context_data, SYS_SRV_THREAD))
0155 {
0156 userlog("tmqueue: Failed to set context");
0157 NDRX_LOG(log_error, "Failed to set context");
0158 exit(1);
0159 }
0160
0161 cd = thread_data->cd;
0162
0163 NDRX_FREE(thread_data->context_data);
0164 NDRX_FREE(thread_data);
0165
0166
0167 if (EXSUCCEED!=ndrx_sv_latejoin())
0168 {
0169 NDRX_LOG(log_error, "Failed to manual-join!");
0170 int_diag|=TMQ_INT_DIAG_EJOIN;
0171 goto out;
0172 }
0173
0174
0175
0176
0177 if (Bunused (p_ub) < 4096)
0178 {
0179 p_ub = (UBFH *)tprealloc ((char *)p_ub, Bsizeof (p_ub) + 4096);
0180 }
0181
0182 ndrx_debug_dump_UBF(log_info, "TMQUEUE call buffer:", p_ub);
0183
0184 if (Bget(p_ub, EX_QCMD, 0, (char *)&cmd, 0L))
0185 {
0186 NDRX_LOG(log_error, "Failed to read command code!");
0187 ret=EXFAIL;
0188 goto out;
0189 }
0190 NDRX_LOG(log_info, "Got command code: [%c]", cmd);
0191
0192 switch(cmd)
0193 {
0194 case TMQ_CMD_ENQUEUE:
0195
0196
0197 if (EXSUCCEED!=tmq_enqueue(p_ub, &int_diag))
0198 {
0199 EXFAIL_OUT(ret);
0200 }
0201 break;
0202 case TMQ_CMD_DEQUEUE:
0203
0204
0205 if (EXSUCCEED!=tmq_dequeue(&p_ub, &int_diag))
0206 {
0207 EXFAIL_OUT(ret);
0208 }
0209 break;
0210 case TMQ_CMD_MQLQ:
0211
0212 if (EXSUCCEED!=tmq_mqlq(p_ub, cd))
0213 {
0214 EXFAIL_OUT(ret);
0215 }
0216 break;
0217 case TMQ_CMD_MQLC:
0218
0219 if (EXSUCCEED!=tmq_mqlc(p_ub, cd))
0220 {
0221 EXFAIL_OUT(ret);
0222 }
0223 break;
0224 case TMQ_CMD_MQLM:
0225
0226 if (EXSUCCEED!=tmq_mqlm(p_ub, cd))
0227 {
0228 EXFAIL_OUT(ret);
0229 }
0230 break;
0231 case TMQ_CMD_MQRC:
0232
0233 if (EXSUCCEED!=tmq_mqrc(p_ub))
0234 {
0235 EXFAIL_OUT(ret);
0236 }
0237
0238 break;
0239 case TMQ_CMD_MQCH:
0240
0241 if (EXSUCCEED!=tmq_mqch(p_ub))
0242 {
0243 EXFAIL_OUT(ret);
0244 }
0245 break;
0246 case TMQ_CMD_STARTTRAN:
0247 case TMQ_CMD_ABORTTRAN:
0248 case TMQ_CMD_PREPARETRAN:
0249 case TMQ_CMD_COMMITRAN:
0250 case TMQ_CMD_CHK_MEMLOG:
0251 case TMQ_CMD_CHK_MEMLOG2:
0252
0253
0254 if (XA_OK!=ndrx_xa_qminiservce(p_ub, cmd))
0255 {
0256 EXFAIL_OUT(ret);
0257 }
0258
0259 break;
0260
0261 default:
0262 NDRX_LOG(log_error, "Unsupported command code: [%c]", cmd);
0263 ret=EXFAIL;
0264 break;
0265 }
0266
0267 out:
0268
0269
0270
0271
0272 if (int_diag & TMQ_INT_DIAG_EJOIN)
0273 {
0274 tpreturn( TPFAIL,
0275 TPETRAN,
0276 NULL,
0277 0L,
0278 TPSOFTERR);
0279 }
0280 else
0281 {
0282 ndrx_debug_dump_UBF(log_info, "TMQUEUE return buffer:", p_ub);
0283
0284 tpreturn( ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0285 0L,
0286 (char *)p_ub,
0287 0L,
0288 0L);
0289 }
0290 }
0291
0292
0293
0294
0295
0296
0297
0298 exprivate void tx_tout_check_th(void *ptr, int *p_finish_off)
0299 {
0300 long tspent;
0301 qtran_log_list_t *tx_list;
0302 qtran_log_list_t *el, *tmp;
0303 qtran_log_t *p_tl;
0304 int in_progress;
0305 int locke;
0306 XID xid;
0307
0308
0309
0310
0311 MUTEX_LOCK_V(M_into_toutchk_lock);
0312
0313 in_progress=M_into_toutchk;
0314
0315
0316 if (!in_progress)
0317 {
0318 M_into_toutchk=EXTRUE;
0319 }
0320
0321 MUTEX_UNLOCK_V(M_into_toutchk_lock);
0322
0323 if (in_progress)
0324 {
0325
0326 goto out;
0327 }
0328
0329 NDRX_LOG(log_dump, "Timeout check (processing...)");
0330
0331
0332
0333 if (M_thread_first)
0334 {
0335 tmq_thread_init();
0336 M_thread_first = EXFALSE;
0337 }
0338
0339 tx_list = tmq_copy_hash2list(COPY_MODE_FOREGROUND | COPY_MODE_ACQLOCK);
0340
0341 LL_FOREACH_SAFE(tx_list,el,tmp)
0342 {
0343 NDRX_LOG(log_debug, "Checking [%s]...", el->p_tl.tmxid);
0344 if ((tspent = ndrx_stopwatch_get_delta_sec(&el->p_tl.ttimer)) >
0345 G_tmqueue_cfg.ses_timeout && XA_TX_STAGE_ACTIVE==el->p_tl.txstage)
0346 {
0347
0348
0349 if (NULL!=(p_tl = tmq_log_get_entry(el->p_tl.tmxid, 0, &locke)))
0350 {
0351 if (XA_TX_STAGE_ACTIVE==p_tl->txstage)
0352 {
0353
0354 NDRX_LOG(log_error, "TMXID Q [%s] timed out "
0355 "(spent %ld, limit: %ld sec) - aborting...!",
0356 el->p_tl.tmxid, tspent,
0357 G_tmqueue_cfg.dflt_timeout);
0358
0359 userlog("TMXID Q [%s] timed out "
0360 "(spent %ld, limit: %ld sec) - aborting...!",
0361 el->p_tl.tmxid, tspent,
0362 G_tmqueue_cfg.dflt_timeout);
0363
0364
0365
0366 el->p_tl.is_abort_only=EXTRUE;
0367
0368 if (NULL==atmi_xa_deserialize_xid((unsigned char *)el->p_tl.tmxid, &xid))
0369 {
0370 NDRX_LOG(log_error, "Failed to deserialize tmxid [%s]",
0371 el->p_tl.tmxid);
0372 tmq_log_unlock(p_tl);
0373 goto next;
0374 }
0375
0376
0377 if (EXSUCCEED!=atmi_xa_rollback_entry(&xid, 0))
0378 {
0379 NDRX_LOG(log_error, "Failed to abort tmxid:[%s]",
0380 el->p_tl.tmxid);
0381 tmq_log_unlock(p_tl);
0382 goto next;
0383 }
0384
0385
0386
0387 }
0388 else
0389 {
0390 NDRX_LOG(log_error, "Q TMXID [%s] was-tout but found not active "
0391 "(txstage %hd spent %ld, limit: %ld sec) - skipping!",
0392 el->p_tl.tmxid, el->p_tl.txstage, tspent, G_tmqueue_cfg.dflt_timeout);
0393 }
0394 }
0395 }
0396 next:
0397 LL_DELETE(tx_list,el);
0398 NDRX_FPFREE(el);
0399
0400 }
0401
0402
0403 out:
0404
0405
0406 MUTEX_LOCK_V(M_into_toutchk_lock);
0407
0408 if (!in_progress)
0409 {
0410 M_into_toutchk=EXFALSE;
0411 }
0412
0413 MUTEX_UNLOCK_V(M_into_toutchk_lock);
0414
0415 return;
0416 }
0417
0418
0419
0420
0421
0422
0423
0424 exprivate int tm_tout_check(void)
0425 {
0426 NDRX_LOG(log_dump, "Timeout check (submit job...)");
0427
0428
0429 if (NULL==M_shutdown_ind)
0430 {
0431
0432 if (G_tmqueue_cfg.ses_timeout > 0)
0433 {
0434
0435 ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)tx_tout_check_th, NULL);
0436 }
0437
0438
0439 if (G_tmqueue_cfg.chkdisk_time > 0 &&
0440 tmq_chkdisk_stopwatch_get_delta_sec() >=G_tmqueue_cfg.chkdisk_time )
0441 {
0442
0443 ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)G_tmq_chkdisk_th,
0444 &G_tmqueue_cfg.chkdisk_time);
0445
0446
0447 tmq_chkdisk_stopwatch_reset();
0448 }
0449
0450 }
0451 else if (M_shutdown_ok)
0452 {
0453 ndrx_sv_do_shutdown("Async shutdown", M_shutdown_ind);
0454 }
0455
0456 return EXSUCCEED;
0457 }
0458
0459
0460
0461
0462
0463
0464 void TMQUEUE (TPSVCINFO *p_svc)
0465 {
0466 int ret=EXSUCCEED;
0467 UBFH *p_ub = (UBFH *)p_svc->data;
0468 long size;
0469 char btype[16];
0470 char stype[16];
0471 ndrx_thread_server_t *thread_data = NDRX_MALLOC(sizeof(ndrx_thread_server_t));
0472 char cmd = EXEOS;
0473
0474 if (NULL==thread_data)
0475 {
0476 userlog("Failed to malloc memory - %s!", strerror(errno));
0477 NDRX_LOG(log_error, "Failed to malloc memory");
0478 EXFAIL_OUT(ret);
0479 }
0480
0481 if (0==(size = tptypes (p_svc->data, btype, stype)))
0482 {
0483 NDRX_LOG(log_error, "Zero buffer received!");
0484 userlog("Zero buffer received!");
0485 EXFAIL_OUT(ret);
0486 }
0487
0488 if (EXSUCCEED!=Bget(p_ub, EX_QCMD, 0, (char *)&cmd, 0L))
0489 {
0490 NDRX_LOG(log_error, "Failed to read command code!");
0491 userlog("Failed to read command code!");
0492 ret=EXFAIL;
0493 goto out;
0494 }
0495
0496 thread_data->buffer = p_svc->data;
0497 thread_data->cd = p_svc->cd;
0498
0499 if (NULL==(thread_data->context_data = tpsrvgetctxdata()))
0500 {
0501 NDRX_LOG(log_error, "Failed to get context data!");
0502 userlog("Failed to get context data!");
0503 EXFAIL_OUT(ret);
0504 }
0505
0506
0507
0508
0509 if (cmd==TMQ_CMD_STARTTRAN||
0510 cmd==TMQ_CMD_PREPARETRAN||
0511 cmd==TMQ_CMD_ABORTTRAN||
0512 cmd==TMQ_CMD_COMMITRAN ||
0513 cmd==TMQ_CMD_CHK_MEMLOG ||
0514 cmd==TMQ_CMD_CHK_MEMLOG2)
0515 {
0516 ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)TMQUEUE_TH, (void *)thread_data);
0517 }
0518 else
0519 {
0520 ndrx_thpool_add_work(G_tmqueue_cfg.thpool, (void*)TMQUEUE_TH, (void *)thread_data);
0521 }
0522
0523 out:
0524 if (EXSUCCEED==ret)
0525 {
0526
0527 tpcontinue();
0528 }
0529 else
0530 {
0531
0532 tpreturn( TPFAIL,
0533 0L,
0534 (char *)p_ub,
0535 0L,
0536 0L);
0537 }
0538 }
0539
0540
0541
0542
0543
0544
0545 exprivate void shutdowncb_th(void *ptr)
0546 {
0547 int i;
0548
0549 NDRX_LOG(log_info, "Async shutdown started...");
0550
0551
0552 pthread_join(G_forward_thread, NULL);
0553
0554 for (i=0; i<G_tmqueue_cfg.fwdpoolsize; i++)
0555 {
0556 ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void *)tmq_thread_shutdown, NULL);
0557 }
0558
0559 ndrx_thpool_wait(G_tmqueue_cfg.fwdthpool);
0560
0561 M_shutdown_ok=EXTRUE;
0562 }
0563
0564
0565
0566
0567
0568
0569
0570 exprivate int shutdowncb(int *shutdown_req)
0571 {
0572
0573 int freethreads=EXFAIL, i;
0574 M_shutdown_ind = shutdown_req;
0575
0576 if (M_init_ok)
0577 {
0578
0579 G_forward_req_shutdown = EXTRUE;
0580 forward_shutdown_wake();
0581
0582
0583
0584
0585
0586 for (i=0; i<20 && !ndrx_G_forward_req_shutdown_ack; i++)
0587 {
0588 usleep(10000);
0589 }
0590
0591 if (ndrx_G_forward_req_shutdown_ack &&
0592 (G_tmqueue_cfg.fwdpoolsize==(freethreads=ndrx_thpool_nr_not_working(G_tmqueue_cfg.fwdthpool)))
0593 )
0594 {
0595 pthread_join(G_forward_thread, NULL);
0596
0597 for (i=0; i<G_tmqueue_cfg.fwdpoolsize; i++)
0598 {
0599 ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void *)tmq_thread_shutdown, NULL);
0600 }
0601
0602
0603 ndrx_sv_do_shutdown("Quick shutdown", shutdown_req);
0604
0605 }
0606 else
0607 {
0608
0609 NDRX_LOG(log_warn, "Async shutdown path (ack=%d free_fwd_threads=%d)",
0610 ndrx_G_forward_req_shutdown_ack, freethreads);
0611
0612 ndrx_thpool_add_work(G_tmqueue_cfg.shutdownseq, (void*)shutdowncb_th, NULL);
0613
0614 }
0615 }
0616
0617 return EXSUCCEED;
0618 }
0619
0620
0621
0622
0623 int tpsvrinit(int argc, char **argv)
0624 {
0625 int ret=EXSUCCEED;
0626 signed char c;
0627 char svcnm[MAXTIDENT+1];
0628 NDRX_LOG(log_debug, "tpsvrinit called");
0629
0630 memset(&G_tmqueue_cfg, 0, sizeof(G_tmqueue_cfg));
0631
0632
0633
0634
0635
0636 G_tmqueue_cfg.ses_timeout=EXFAIL;
0637 G_tmqueue_cfg.vnodeid=tpgetnodeid();
0638
0639
0640 while ((c = getopt(argc, argv, "q:m:s:p:t:f:u:c:T:Nn:X:")) != -1)
0641 {
0642 if (optarg)
0643 {
0644 NDRX_LOG(log_debug, "%c = [%s]", c, optarg);
0645 }
0646 else
0647 {
0648 NDRX_LOG(log_debug, "got %c", c);
0649 }
0650
0651 switch(c)
0652 {
0653 case 'X':
0654 G_tmqueue_cfg.chkdisk_time=atoi(optarg);
0655
0656 NDRX_LOG(log_info, "Check disk messages set to %d sec",
0657 G_tmqueue_cfg.chkdisk_time);
0658 break;
0659 case 'n':
0660 G_tmqueue_cfg.vnodeid = atol(optarg);
0661 NDRX_LOG(log_info, "Virtual Enduro/X Cluster Node ID set to %ld",
0662 G_tmqueue_cfg.vnodeid);
0663 break;
0664 case 'N':
0665 G_tmqueue_cfg.no_chkrun = EXTRUE;
0666 NDRX_LOG(log_info, "Will not forward trigger queue run.");
0667 break;
0668 case 'm':
0669
0670
0671 NDRX_LOG(log_error, "ERROR ! Please convert queue settings to NDRX_XA_OPEN_STR (datadir=,qspace=)");
0672 EXFAIL_OUT(ret);
0673
0674 break;
0675
0676 case 'q':
0677
0678 NDRX_STRCPY_SAFE(G_tmqueue_cfg.qconfig, optarg);
0679 NDRX_LOG(log_error, "Loading q config: [%s]", G_tmqueue_cfg.qconfig);
0680 if (EXSUCCEED!=tmq_reload_conf(G_tmqueue_cfg.qconfig))
0681 {
0682 NDRX_LOG(log_error, "Failed to read config for: [%s]", G_tmqueue_cfg.qconfig);
0683 EXFAIL_OUT(ret);
0684 }
0685 break;
0686 case 's':
0687 G_tmqueue_cfg.scan_time = atoi(optarg);
0688 break;
0689 case 'p':
0690 G_tmqueue_cfg.threadpoolsize = atol(optarg);
0691 break;
0692 case 'u':
0693 G_tmqueue_cfg.notifpoolsize = atol(optarg);
0694 break;
0695 case 'f':
0696 G_tmqueue_cfg.fwdpoolsize = atol(optarg);
0697 break;
0698 case 't':
0699 G_tmqueue_cfg.dflt_timeout = atol(optarg);
0700 break;
0701 case 'T':
0702 G_tmqueue_cfg.ses_timeout = atol(optarg);
0703 break;
0704 case 'c':
0705
0706 G_tmqueue_cfg.tout_check_time = atoi(optarg);
0707 break;
0708 default:
0709
0710 break;
0711 }
0712 }
0713
0714 if (ndrx_get_G_cconfig())
0715 {
0716 if (EXSUCCEED!=tmq_reload_conf(NULL))
0717 {
0718 NDRX_LOG(log_error, "Failed to read CCONFIG's @queue section!");
0719 EXFAIL_OUT(ret);
0720 }
0721 }
0722
0723
0724 if (0>=G_tmqueue_cfg.scan_time)
0725 {
0726 G_tmqueue_cfg.scan_time = SCAN_TIME_DFLT;
0727 }
0728
0729 if (0>=G_tmqueue_cfg.threadpoolsize)
0730 {
0731 G_tmqueue_cfg.threadpoolsize = THREADPOOL_DFLT;
0732 }
0733
0734 if (0>=G_tmqueue_cfg.notifpoolsize)
0735 {
0736 G_tmqueue_cfg.notifpoolsize = THREADPOOL_DFLT;
0737 }
0738
0739 if (0>=G_tmqueue_cfg.fwdpoolsize)
0740 {
0741 G_tmqueue_cfg.fwdpoolsize = THREADPOOL_DFLT;
0742 }
0743
0744 if (0>=G_tmqueue_cfg.dflt_timeout)
0745 {
0746 G_tmqueue_cfg.dflt_timeout = TXTOUT_DFLT;
0747 }
0748
0749 if (0>G_tmqueue_cfg.ses_timeout)
0750 {
0751 G_tmqueue_cfg.ses_timeout = SES_TOUT_DFLT;
0752 }
0753
0754 if (0>=G_tmqueue_cfg.tout_check_time)
0755 {
0756 G_tmqueue_cfg.tout_check_time = TOUT_CHECK_TIME;
0757 }
0758
0759 NDRX_LOG(log_info, "Recovery scan time set to [%d]",
0760 G_tmqueue_cfg.scan_time);
0761
0762 NDRX_LOG(log_info, "Worker pool size [%d] threads",
0763 G_tmqueue_cfg.threadpoolsize);
0764
0765 NDRX_LOG(log_info, "Worker notify-loop-back pool size [%d] threads",
0766 G_tmqueue_cfg.notifpoolsize);
0767
0768 NDRX_LOG(log_info, "Forward pool size [%d] threads",
0769 G_tmqueue_cfg.fwdpoolsize);
0770
0771 NDRX_LOG(log_info, "Local transaction tout set to: [%ld]",
0772 G_tmqueue_cfg.dflt_timeout );
0773
0774 NDRX_LOG(log_info, "Session transaction tout set to: [%ld]",
0775 G_tmqueue_cfg.ses_timeout);
0776
0777 NDRX_LOG(log_info, "Periodic timeout-check time: [%d]",
0778 G_tmqueue_cfg.tout_check_time);
0779
0780
0781 NDRX_LOG(log_debug, "About to Open XA Entry!");
0782
0783 if (EXSUCCEED!=tpopen())
0784 {
0785 EXFAIL_OUT(ret);
0786 }
0787
0788
0789
0790
0791 if (EXSUCCEED!=tmq_load_msgs())
0792 {
0793 EXFAIL_OUT(ret);
0794 }
0795
0796
0797 if (EXSUCCEED!=tmq_log_abortall())
0798 {
0799 EXFAIL_OUT(ret);
0800 }
0801
0802
0803
0804
0805
0806
0807
0808
0809
0810
0811
0812
0813 snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TMQ, G_tmqueue_cfg.vnodeid, tpgetsrvid());
0814
0815 if (EXSUCCEED!=tpadvertise(svcnm, TMQUEUE))
0816 {
0817 NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0818 EXFAIL_OUT(ret);
0819 }
0820
0821 if (EXSUCCEED!=tpadvertise(ndrx_G_qspacesvc, TMQUEUE))
0822 {
0823 NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0824 EXFAIL_OUT(ret);
0825 }
0826
0827 if (EXSUCCEED!=tmq_fwd_stat_init())
0828 {
0829 NDRX_LOG(log_error, "Failed to init forward statistics");
0830 EXFAIL_OUT(ret);
0831 }
0832
0833
0834 if (NULL==(G_tmqueue_cfg.thpool = ndrx_thpool_init(G_tmqueue_cfg.threadpoolsize,
0835 NULL, NULL, NULL, 0, NULL)))
0836 {
0837 NDRX_LOG(log_error, "Failed to initialize thread pool (cnt: %d)!",
0838 G_tmqueue_cfg.threadpoolsize);
0839 EXFAIL_OUT(ret);
0840 }
0841
0842 if (NULL==(G_tmqueue_cfg.notifthpool = ndrx_thpool_init(G_tmqueue_cfg.notifpoolsize,
0843 NULL, NULL, NULL, 0, NULL)))
0844 {
0845 NDRX_LOG(log_error, "Failed to initialize udpate thread pool (cnt: %d)!",
0846 G_tmqueue_cfg.notifpoolsize);
0847 EXFAIL_OUT(ret);
0848 }
0849
0850
0851 if (NULL==(G_tmqueue_cfg.fwdthpool = ndrx_thpool_init(G_tmqueue_cfg.fwdpoolsize,
0852 NULL, NULL, NULL, 0, NULL)))
0853 {
0854 NDRX_LOG(log_error, "Failed to initialize fwd thread pool (cnt: %d)!",
0855 G_tmqueue_cfg.fwdpoolsize);
0856 EXFAIL_OUT(ret);
0857 }
0858
0859 if (NULL==(G_tmqueue_cfg.shutdownseq = ndrx_thpool_init(1,
0860 NULL, NULL, NULL, 0, NULL)))
0861 {
0862 NDRX_LOG(log_error, "Failed to initialize shutdown thread pool!");
0863 EXFAIL_OUT(ret);
0864 }
0865
0866
0867 if (EXSUCCEED!=tpext_addperiodcb(G_tmqueue_cfg.tout_check_time, tm_tout_check))
0868 {
0869 NDRX_LOG(log_error, "tpext_addperiodcb failed: %s",
0870 tpstrerror(tperrno));
0871 EXFAIL_OUT(ret);
0872 }
0873
0874
0875
0876
0877 if (EXSUCCEED!=ndrx_tpext_addbshutdowncb(shutdowncb))
0878 {
0879 NDRX_LOG(log_error, "Failed to add shutdown sequencer callback!");
0880 EXFAIL_OUT(ret);
0881 }
0882
0883
0884 if (EXSUCCEED!=forward_process_init())
0885 {
0886 NDRX_LOG(log_error, "Failed to initialize fwd process thread");
0887 EXFAIL_OUT(ret);
0888 }
0889
0890 tmq_chkdisk_stopwatch_reset();
0891
0892
0893 M_init_ok=EXTRUE;
0894
0895 out:
0896 return ret;
0897 }
0898
0899
0900
0901
0902 void tpsvrdone(void)
0903 {
0904 int i;
0905
0906 NDRX_LOG(log_debug, "tpsvrdone called - requesting "
0907 "background thread shutdown...");
0908
0909 if (M_init_ok)
0910 {
0911
0912
0913 for (i=0; i<G_tmqueue_cfg.threadpoolsize; i++)
0914 {
0915 ndrx_thpool_add_work(G_tmqueue_cfg.thpool, (void *)tmq_thread_shutdown, NULL);
0916 }
0917
0918
0919 for (i=0; i<G_tmqueue_cfg.notifpoolsize; i++)
0920 {
0921 ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void *)tmq_thread_shutdown, NULL);
0922 }
0923
0924
0925 ndrx_thpool_add_work(G_tmqueue_cfg.shutdownseq, (void *)tmq_thread_shutdown, NULL);
0926
0927
0928 ndrx_thpool_wait(G_tmqueue_cfg.shutdownseq);
0929 ndrx_thpool_destroy(G_tmqueue_cfg.shutdownseq);
0930
0931
0932
0933
0934
0935 ndrx_thpool_destroy(G_tmqueue_cfg.fwdthpool);
0936
0937 }
0938
0939 tpclose();
0940
0941 }
0942
0943
0944
0945
0946
0947
0948 expublic void tmq_thread_shutdown(void *ptr, int *p_finish_off)
0949 {
0950 tmq_thread_uninit();
0951
0952 *p_finish_off = EXTRUE;
0953 }
0954