Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief EnduroX Queue Server
0003  *
0004  * @file tmqueue.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <ndrx_config.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <string.h>
0038 #include <unistd.h>    /* for getopt */
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 
0043 #include <ndebug.h>
0044 #include <atmi.h>
0045 #include <atmi_int.h>
0046 #include <typed_buf.h>
0047 #include <ndrstandard.h>
0048 #include <ubf.h>
0049 #include <Exfields.h>
0050 
0051 #include <exnet.h>
0052 #include <ndrxdcmn.h>
0053 
0054 #include "tmqd.h"
0055 #include "tperror.h"
0056 #include "userlog.h"
0057 #include <xa_cmn.h>
0058 #include <exthpool.h>
0059 #include "qcommon.h"
0060 #include "cconfig.h"
0061 #include <ubfutil.h>
0062 #include <thlock.h>
0063 #include "qtran.h"
0064 #include "../libatmisrv/srv_int.h"
0065 /*---------------------------Externs------------------------------------*/
0066 /*---------------------------Macros-------------------------------------*/
0067 /*---------------------------Enums--------------------------------------*/
0068 /*---------------------------Typedefs-----------------------------------*/
0069 /*---------------------------Globals------------------------------------*/
0070 expublic tmqueue_cfg_t G_tmqueue_cfg;
0071 /*---------------------------Statics------------------------------------*/
0072 exprivate int M_init_ok = EXFALSE;
0073 exprivate __thread int M_thread_first = EXTRUE;
0074 
0075 /* allow only one timeout check at the same time... */
0076 exprivate int volatile M_into_toutchk = EXFALSE;
0077 exprivate MUTEX_LOCKDECL(M_into_toutchk_lock);
0078 
0079 /** mark that thrad pool is done with shutdown seq */
0080 exprivate int M_shutdown_ok = EXFALSE;
0081 
0082 /** global shutdown indicator */
0083 exprivate int *M_shutdown_ind = NULL;
0084 
0085 /** mark globally that timeout processing is in progress
0086  * do not collide with any worker already running...
0087  */
0088 /*---------------------------Prototypes---------------------------------*/
0089 exprivate int tx_tout_check(void);
0090 exprivate void tm_chk_one_free_thread(void *ptr, int *p_finish_off);
0091 exprivate void tm_chk_one_free_thread_notif(void *ptr, int *p_finish_off);
0092 
0093 /**
0094  * Initialize thread
0095  */
0096 expublic void tmq_thread_init(void)
0097 {
0098     if (EXSUCCEED!=tpinit(NULL))
0099     {
0100         NDRX_LOG(log_error, "Failed to init worker client");
0101         userlog("tmsrv: Failed to init worker client");
0102         exit(1);
0103     }
0104     
0105     if (EXSUCCEED!=tpopen())
0106     {
0107         NDRX_LOG(log_error, "Worker thread failed to tpopen() - nothing to do, "
0108                 "process will exit");
0109         userlog("Worker thread failed to tpopen() - nothing to do, "
0110                 "process will exit");
0111         exit(1);
0112     }
0113     
0114 }
0115 
0116 /**
0117  * Close the thread session
0118  */
0119 expublic void tmq_thread_uninit(void)
0120 {
0121     NDRX_LOG(log_debug, "Into tmq_thread_uninit");
0122     tpclose();
0123     tpterm();
0124 }
0125 
0126 /**
0127  * Tmqueue service entry (working thread)
0128  * @param p_svc - data & len used only...!
0129  */
0130 void TMQUEUE_TH (void *ptr, int *p_finish_off)
0131 {
0132     /* Ok we should not handle the commands 
0133      * TPBEGIN...
0134      */
0135     int ret=EXSUCCEED;
0136     ndrx_thread_server_t *thread_data = (ndrx_thread_server_t *)ptr;
0137     char cmd = EXEOS;
0138     int cd;
0139     int int_diag = 0;
0140     
0141     /**************************************************************************/
0142     /*                        THREAD CONTEXT RESTORE                          */
0143     /**************************************************************************/
0144     UBFH *p_ub = (UBFH *)thread_data->buffer;
0145     
0146     /* Do the ATMI init */
0147     if (M_thread_first)
0148     {
0149         tmq_thread_init();
0150         M_thread_first = EXFALSE;
0151     }
0152     
0153     /* restore context. */
0154     if (EXSUCCEED!=tpsrvsetctxdata(thread_data->context_data, SYS_SRV_THREAD))
0155     {
0156         userlog("tmqueue: Failed to set context");
0157         NDRX_LOG(log_error, "Failed to set context");
0158         exit(1);
0159     }
0160     
0161     cd = thread_data->cd;
0162     /* free up the transport data.*/
0163     NDRX_FREE(thread_data->context_data);
0164     NDRX_FREE(thread_data);
0165 
0166     /* try to join */
0167     if (EXSUCCEED!=ndrx_sv_latejoin())
0168     {
0169         NDRX_LOG(log_error, "Failed to manual-join!");
0170         int_diag|=TMQ_INT_DIAG_EJOIN;
0171         goto out;        
0172     }
0173 
0174     /**************************************************************************/
0175     
0176     /* get some more stuff! */
0177     if (Bunused (p_ub) < 4096)
0178     {
0179         p_ub = (UBFH *)tprealloc ((char *)p_ub, Bsizeof (p_ub) + 4096);
0180     }
0181     
0182     ndrx_debug_dump_UBF(log_info, "TMQUEUE call buffer:", p_ub);
0183     
0184     if (Bget(p_ub, EX_QCMD, 0, (char *)&cmd, 0L))
0185     {
0186         NDRX_LOG(log_error, "Failed to read command code!");
0187         ret=EXFAIL;
0188         goto out;
0189     }
0190     NDRX_LOG(log_info, "Got command code: [%c]", cmd);
0191     
0192     switch(cmd)
0193     {
0194         case TMQ_CMD_ENQUEUE:
0195             
0196             /* start new tran... */
0197             if (EXSUCCEED!=tmq_enqueue(p_ub, &int_diag))
0198             {
0199                 EXFAIL_OUT(ret);
0200             }
0201             break;
0202         case TMQ_CMD_DEQUEUE:
0203             
0204             /* start new tran... */
0205             if (EXSUCCEED!=tmq_dequeue(&p_ub, &int_diag))
0206             {
0207                 EXFAIL_OUT(ret);
0208             }
0209             break;
0210         case TMQ_CMD_MQLQ:
0211             
0212             if (EXSUCCEED!=tmq_mqlq(p_ub, cd))
0213             {
0214                 EXFAIL_OUT(ret);
0215             }
0216             break;
0217         case TMQ_CMD_MQLC:
0218             
0219             if (EXSUCCEED!=tmq_mqlc(p_ub, cd))
0220             {
0221                 EXFAIL_OUT(ret);
0222             }
0223             break;
0224         case TMQ_CMD_MQLM:
0225             
0226             if (EXSUCCEED!=tmq_mqlm(p_ub, cd))
0227             {
0228                 EXFAIL_OUT(ret);
0229             }
0230             break;
0231         case TMQ_CMD_MQRC:
0232             
0233             if (EXSUCCEED!=tmq_mqrc(p_ub))
0234             {
0235                 EXFAIL_OUT(ret);
0236             }
0237             
0238             break;
0239         case TMQ_CMD_MQCH:
0240             
0241             if (EXSUCCEED!=tmq_mqch(p_ub))
0242             {
0243                 EXFAIL_OUT(ret);
0244             }
0245             break;
0246         case TMQ_CMD_STARTTRAN:
0247         case TMQ_CMD_ABORTTRAN:
0248         case TMQ_CMD_PREPARETRAN:
0249         case TMQ_CMD_COMMITRAN:
0250         case TMQ_CMD_CHK_MEMLOG:
0251         case TMQ_CMD_CHK_MEMLOG2:
0252             
0253             /* start Q space transaction */
0254             if (XA_OK!=ndrx_xa_qminiservce(p_ub, cmd))
0255             {
0256                 EXFAIL_OUT(ret);
0257             }
0258             
0259             break;
0260             
0261         default:
0262             NDRX_LOG(log_error, "Unsupported command code: [%c]", cmd);
0263             ret=EXFAIL;
0264             break;
0265     }
0266     
0267 out:
0268 
0269     /* 
0270      * Generate TPETRAN in case if failed to join
0271      */
0272     if (int_diag & TMQ_INT_DIAG_EJOIN)
0273     {
0274         tpreturn(  TPFAIL,
0275                     TPETRAN,
0276                     NULL,
0277                     0L,
0278                     TPSOFTERR);
0279     }
0280     else
0281     {
0282         ndrx_debug_dump_UBF(log_info, "TMQUEUE return buffer:", p_ub);
0283 
0284         tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0285                     0L,
0286                     (char *)p_ub,
0287                     0L,
0288                     0L);
0289     }
0290 }
0291 
0292 
0293 /**
0294  * Periodic main thread callback for 
0295  * (will be done by threadpoll)
0296  * @return 
0297  */
0298 exprivate void tx_tout_check_th(void *ptr, int *p_finish_off)
0299 {
0300     long tspent;
0301     qtran_log_list_t *tx_list;
0302     qtran_log_list_t *el, *tmp;
0303     qtran_log_t *p_tl;
0304     int in_progress;
0305     int locke;
0306     XID xid;
0307     /* Create a copy of hash, iterate and check each tx for timeout condition
0308      * If so then initiate internal abort call
0309      */
0310     
0311     MUTEX_LOCK_V(M_into_toutchk_lock);
0312     
0313     in_progress=M_into_toutchk;
0314     
0315     /* do lock if was free */
0316     if (!in_progress)
0317     {
0318         M_into_toutchk=EXTRUE;
0319     }
0320             
0321     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0322     
0323     if (in_progress)
0324     {
0325         /* nothing todo... */
0326         goto out;
0327     }
0328             
0329     NDRX_LOG(log_dump, "Timeout check (processing...)");
0330     
0331     /* Do the ATMI init, if needed 
0332      */
0333     if (M_thread_first)
0334     {
0335         tmq_thread_init();
0336         M_thread_first = EXFALSE;
0337     }
0338     
0339     tx_list = tmq_copy_hash2list(COPY_MODE_FOREGROUND | COPY_MODE_ACQLOCK);
0340         
0341     LL_FOREACH_SAFE(tx_list,el,tmp)
0342     {
0343         NDRX_LOG(log_debug, "Checking [%s]...", el->p_tl.tmxid);
0344         if ((tspent = ndrx_stopwatch_get_delta_sec(&el->p_tl.ttimer)) > 
0345                 G_tmqueue_cfg.ses_timeout && XA_TX_STAGE_ACTIVE==el->p_tl.txstage)
0346         {
0347             
0348             /* get the finally the entry and process... */
0349             if (NULL!=(p_tl = tmq_log_get_entry(el->p_tl.tmxid, 0, &locke)))
0350             {
0351                 if (XA_TX_STAGE_ACTIVE==p_tl->txstage)
0352                 {
0353                     
0354                     NDRX_LOG(log_error, "TMXID Q [%s] timed out "
0355                         "(spent %ld, limit: %ld sec) - aborting...!", 
0356                         el->p_tl.tmxid, tspent, 
0357                         G_tmqueue_cfg.dflt_timeout);
0358             
0359                     userlog("TMXID Q [%s] timed out "
0360                             "(spent %ld, limit: %ld sec) - aborting...!", 
0361                             el->p_tl.tmxid, tspent, 
0362                             G_tmqueue_cfg.dflt_timeout);
0363                     
0364                     /* do abort...! */
0365                     
0366                     el->p_tl.is_abort_only=EXTRUE;
0367                     
0368                     if (NULL==atmi_xa_deserialize_xid((unsigned char *)el->p_tl.tmxid, &xid))
0369                     {
0370                         NDRX_LOG(log_error, "Failed to deserialize tmxid [%s]", 
0371                                 el->p_tl.tmxid);
0372                         tmq_log_unlock(p_tl);
0373                         goto next;
0374                     }
0375                     
0376                     /* try to rollback the stuff...! */
0377                     if (EXSUCCEED!=atmi_xa_rollback_entry(&xid, 0))
0378                     {
0379                         NDRX_LOG(log_error, "Failed to abort tmxid:[%s]", 
0380                                 el->p_tl.tmxid);
0381                         tmq_log_unlock(p_tl);
0382                         goto next;
0383                     }
0384                     
0385                     /* Transaction must be removed at this point */
0386                     
0387                 }
0388                 else
0389                 {
0390                     NDRX_LOG(log_error, "Q TMXID [%s] was-tout but found not active "
0391                         "(txstage %hd spent %ld, limit: %ld sec) - skipping!", 
0392                         el->p_tl.tmxid, el->p_tl.txstage, tspent, G_tmqueue_cfg.dflt_timeout);
0393                 }
0394             }
0395         }
0396 next:
0397         LL_DELETE(tx_list,el);
0398         NDRX_FPFREE(el);
0399         
0400     }
0401     
0402     
0403 out:    
0404 
0405     /* if was not in progress then we locked  */
0406     MUTEX_LOCK_V(M_into_toutchk_lock);
0407 
0408     if (!in_progress)
0409     {
0410         M_into_toutchk=EXFALSE;
0411     }   
0412 
0413     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0414     
0415     return;
0416 }
0417 
0418 /**
0419  * Callback routine for scheduled timeout checks.
0420  * TODO: if we add shutdown handlers, then check here is all completed
0421  * before we inject back the shutdown msg...
0422  * @return 
0423  */
0424 exprivate int tm_tout_check(void)
0425 {
0426     NDRX_LOG(log_dump, "Timeout check (submit job...)");
0427     
0428     /* Check transaction timeouts only if session timeout is not disabled */
0429     if (NULL==M_shutdown_ind)
0430     {
0431 
0432         if (G_tmqueue_cfg.ses_timeout > 0)
0433         {
0434             /* no shutdown requested... yet... */
0435             ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)tx_tout_check_th, NULL);
0436         }
0437 
0438         /* trigger disk checks */
0439         if (G_tmqueue_cfg.chkdisk_time > 0 &&
0440             tmq_chkdisk_stopwatch_get_delta_sec() >=G_tmqueue_cfg.chkdisk_time )
0441         {
0442             /* pass th ptr to func, so that it can reset it at the end of the run? */
0443             ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)G_tmq_chkdisk_th, 
0444                 &G_tmqueue_cfg.chkdisk_time);
0445 
0446             /* reset stopwatch to avoid false runs (i.e. if check run is long...) */
0447             tmq_chkdisk_stopwatch_reset();
0448         }
0449 
0450     }
0451     else if (M_shutdown_ok)
0452     {
0453         ndrx_sv_do_shutdown("Async shutdown", M_shutdown_ind);
0454     }
0455     
0456     return EXSUCCEED;
0457 }
0458 
0459 
0460 /**
0461  * Entry point for service (main thread)
0462  * @param p_svc
0463  */
0464 void TMQUEUE (TPSVCINFO *p_svc)
0465 {
0466     int ret=EXSUCCEED;
0467     UBFH *p_ub = (UBFH *)p_svc->data; /* this is auto-buffer */
0468     long size;
0469     char btype[16];
0470     char stype[16];
0471     ndrx_thread_server_t *thread_data = NDRX_MALLOC(sizeof(ndrx_thread_server_t));
0472     char cmd = EXEOS;
0473     
0474     if (NULL==thread_data)
0475     {
0476         userlog("Failed to malloc memory - %s!", strerror(errno));
0477         NDRX_LOG(log_error, "Failed to malloc memory");
0478         EXFAIL_OUT(ret);
0479     }
0480     
0481     if (0==(size = tptypes (p_svc->data, btype, stype)))
0482     {
0483         NDRX_LOG(log_error, "Zero buffer received!");
0484         userlog("Zero buffer received!");
0485         EXFAIL_OUT(ret);
0486     }
0487 
0488     if (EXSUCCEED!=Bget(p_ub, EX_QCMD, 0, (char *)&cmd, 0L))
0489     {
0490         NDRX_LOG(log_error, "Failed to read command code!");
0491         userlog("Failed to read command code!");
0492         ret=EXFAIL;
0493         goto out;
0494     }
0495 
0496     thread_data->buffer = p_svc->data; /*the buffer is not made free by thread */
0497     thread_data->cd = p_svc->cd;
0498     
0499     if (NULL==(thread_data->context_data = tpsrvgetctxdata()))
0500     {
0501         NDRX_LOG(log_error, "Failed to get context data!");
0502         userlog("Failed to get context data!");
0503         EXFAIL_OUT(ret);
0504     }
0505     
0506     /* submit the job to thread pool: 
0507      * For transaction finalization use different thread pool
0508      */
0509     if (cmd==TMQ_CMD_STARTTRAN||
0510             cmd==TMQ_CMD_PREPARETRAN||
0511             cmd==TMQ_CMD_ABORTTRAN||
0512             cmd==TMQ_CMD_COMMITRAN || 
0513             cmd==TMQ_CMD_CHK_MEMLOG || 
0514             cmd==TMQ_CMD_CHK_MEMLOG2)
0515     {
0516         ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void*)TMQUEUE_TH, (void *)thread_data);
0517     }
0518     else
0519     {
0520         ndrx_thpool_add_work(G_tmqueue_cfg.thpool, (void*)TMQUEUE_TH, (void *)thread_data);
0521     }
0522     
0523 out:
0524     if (EXSUCCEED==ret)
0525     {
0526         /* serve next.. */ 
0527         tpcontinue();
0528     }
0529     else
0530     {
0531         /* return error back */
0532         tpreturn(  TPFAIL,
0533                 0L,
0534                 (char *)p_ub,
0535                 0L,
0536                 0L);
0537     }
0538 }
0539 
0540 
0541 /**
0542  * Full shutdown, as forward does something...
0543  * @param ptr data, not used
0544  */
0545 exprivate void shutdowncb_th(void *ptr)
0546 {
0547     int i;
0548     
0549     NDRX_LOG(log_info, "Async shutdown started...");
0550         
0551     /* Wait to complete */
0552     pthread_join(G_forward_thread, NULL);
0553 
0554     for (i=0; i<G_tmqueue_cfg.fwdpoolsize; i++)
0555     {
0556         ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void *)tmq_thread_shutdown, NULL);
0557     }
0558 
0559     ndrx_thpool_wait(G_tmqueue_cfg.fwdthpool);
0560     
0561     M_shutdown_ok=EXTRUE;
0562 }
0563 
0564 /**
0565  * Shutdown sequencer
0566  * So that we terminate all processing string in the right order
0567  * @param shutdown_req ptr to indicator
0568  * @return SUCCEED
0569  */
0570 exprivate int shutdowncb(int *shutdown_req)
0571 {
0572     /* submit shutdown job */
0573     int freethreads=EXFAIL, i;
0574     M_shutdown_ind = shutdown_req;
0575     
0576     if (M_init_ok)
0577     {
0578         /* request the shutdown */
0579         G_forward_req_shutdown = EXTRUE;
0580         forward_shutdown_wake();
0581         
0582         /* check the ack 
0583          * Sleep 0.2 sec..., let forward to wake up... & finish with 10ms
0584          * interval...
0585          */
0586         for (i=0; i<20 && !ndrx_G_forward_req_shutdown_ack; i++)
0587         {
0588             usleep(10000);
0589         }
0590         
0591         if (ndrx_G_forward_req_shutdown_ack &&
0592                 (G_tmqueue_cfg.fwdpoolsize==(freethreads=ndrx_thpool_nr_not_working(G_tmqueue_cfg.fwdthpool)))
0593                 )
0594         {   
0595             pthread_join(G_forward_thread, NULL);
0596             
0597             for (i=0; i<G_tmqueue_cfg.fwdpoolsize; i++)
0598             {
0599                 ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void *)tmq_thread_shutdown, NULL);
0600             }
0601          
0602             /* terminate now */
0603             ndrx_sv_do_shutdown("Quick shutdown", shutdown_req);
0604             
0605         }
0606         else
0607         {
0608             /* async shutdown procedure */
0609             NDRX_LOG(log_warn, "Async shutdown path (ack=%d free_fwd_threads=%d)",
0610                     ndrx_G_forward_req_shutdown_ack, freethreads);
0611             
0612             ndrx_thpool_add_work(G_tmqueue_cfg.shutdownseq, (void*)shutdowncb_th, NULL);
0613             
0614         }
0615     }
0616     
0617     return EXSUCCEED;
0618 }
0619 
0620 /*
0621  * Do initialization
0622  */
0623 int tpsvrinit(int argc, char **argv)
0624 {
0625     int ret=EXSUCCEED;
0626     signed char c;
0627     char svcnm[MAXTIDENT+1];
0628     NDRX_LOG(log_debug, "tpsvrinit called");
0629     
0630     memset(&G_tmqueue_cfg, 0, sizeof(G_tmqueue_cfg));
0631     
0632     /* no setting applied.
0633      * 0 means -> no session timeout.
0634      * which in case of tmqueue forward enqueue failures will hang the transaction
0635      */
0636     G_tmqueue_cfg.ses_timeout=EXFAIL;
0637     G_tmqueue_cfg.vnodeid=tpgetnodeid();
0638     
0639     /* Parse command line  */
0640     while ((c = getopt(argc, argv, "q:m:s:p:t:f:u:c:T:Nn:X:")) != -1)
0641     {
0642         if (optarg)
0643         {
0644             NDRX_LOG(log_debug, "%c = [%s]", c, optarg);
0645         }
0646         else
0647         {
0648             NDRX_LOG(log_debug, "got %c", c);
0649         }
0650 
0651         switch(c)
0652         {
0653             case 'X':
0654                 G_tmqueue_cfg.chkdisk_time=atoi(optarg);
0655 
0656                 NDRX_LOG(log_info, "Check disk messages set to %d sec",
0657                                 G_tmqueue_cfg.chkdisk_time);
0658                 break;
0659             case 'n':
0660                 G_tmqueue_cfg.vnodeid = atol(optarg);
0661                 NDRX_LOG(log_info, "Virtual Enduro/X Cluster Node ID set to %ld",
0662                             G_tmqueue_cfg.vnodeid);
0663                 break;
0664             case 'N':
0665                 G_tmqueue_cfg.no_chkrun = EXTRUE;
0666                 NDRX_LOG(log_info, "Will not forward trigger queue run.");
0667                 break;
0668             case 'm': 
0669                 
0670                 /* Ask to convert: */
0671                 NDRX_LOG(log_error, "ERROR ! Please convert queue settings to NDRX_XA_OPEN_STR (datadir=,qspace=)");
0672                 EXFAIL_OUT(ret);
0673                 
0674                 break;
0675                 
0676             case 'q':
0677                 /* Add the queue */
0678                 NDRX_STRCPY_SAFE(G_tmqueue_cfg.qconfig, optarg);
0679                 NDRX_LOG(log_error, "Loading q config: [%s]", G_tmqueue_cfg.qconfig);
0680                 if (EXSUCCEED!=tmq_reload_conf(G_tmqueue_cfg.qconfig))
0681                 {
0682                     NDRX_LOG(log_error, "Failed to read config for: [%s]", G_tmqueue_cfg.qconfig);
0683                     EXFAIL_OUT(ret);
0684                 }
0685                 break;
0686             case 's': 
0687                 G_tmqueue_cfg.scan_time = atoi(optarg);
0688                 break;
0689             case 'p': 
0690                 G_tmqueue_cfg.threadpoolsize = atol(optarg);
0691                 break;
0692             case 'u': 
0693                 G_tmqueue_cfg.notifpoolsize = atol(optarg);
0694                 break;
0695             case 'f': 
0696                 G_tmqueue_cfg.fwdpoolsize = atol(optarg);
0697                 break;
0698             case 't': 
0699                 G_tmqueue_cfg.dflt_timeout = atol(optarg);
0700                 break;
0701             case 'T': 
0702                 G_tmqueue_cfg.ses_timeout  = atol(optarg);
0703                 break;
0704             case 'c': 
0705                 /* Time for time-out checking... */
0706                 G_tmqueue_cfg.tout_check_time = atoi(optarg);
0707                 break;
0708             default:
0709                 /*return FAIL;*/
0710                 break;
0711         }
0712     }
0713     
0714     if (ndrx_get_G_cconfig())
0715     {
0716         if (EXSUCCEED!=tmq_reload_conf(NULL))
0717         {
0718             NDRX_LOG(log_error, "Failed to read CCONFIG's @queue section!");
0719             EXFAIL_OUT(ret);
0720         }
0721     }
0722     
0723     /* Check the parameters & default them if needed */
0724     if (0>=G_tmqueue_cfg.scan_time)
0725     {
0726         G_tmqueue_cfg.scan_time = SCAN_TIME_DFLT;
0727     }
0728     
0729     if (0>=G_tmqueue_cfg.threadpoolsize)
0730     {
0731         G_tmqueue_cfg.threadpoolsize = THREADPOOL_DFLT;
0732     }
0733     
0734     if (0>=G_tmqueue_cfg.notifpoolsize)
0735     {
0736         G_tmqueue_cfg.notifpoolsize = THREADPOOL_DFLT;
0737     }
0738     
0739     if (0>=G_tmqueue_cfg.fwdpoolsize)
0740     {
0741         G_tmqueue_cfg.fwdpoolsize = THREADPOOL_DFLT;
0742     }
0743     
0744     if (0>=G_tmqueue_cfg.dflt_timeout)
0745     {
0746         G_tmqueue_cfg.dflt_timeout = TXTOUT_DFLT;
0747     }
0748     
0749     if (0>G_tmqueue_cfg.ses_timeout)
0750     {
0751         G_tmqueue_cfg.ses_timeout = SES_TOUT_DFLT;
0752     }
0753     
0754     if (0>=G_tmqueue_cfg.tout_check_time)
0755     {
0756         G_tmqueue_cfg.tout_check_time = TOUT_CHECK_TIME;
0757     }
0758     
0759     NDRX_LOG(log_info, "Recovery scan time set to [%d]",
0760                             G_tmqueue_cfg.scan_time);
0761     
0762     NDRX_LOG(log_info, "Worker pool size [%d] threads",
0763                             G_tmqueue_cfg.threadpoolsize);
0764     
0765     NDRX_LOG(log_info, "Worker  notify-loop-back pool size [%d] threads",
0766                             G_tmqueue_cfg.notifpoolsize);
0767     
0768     NDRX_LOG(log_info, "Forward pool size [%d] threads",
0769                             G_tmqueue_cfg.fwdpoolsize);
0770     
0771     NDRX_LOG(log_info, "Local transaction tout set to: [%ld]", 
0772             G_tmqueue_cfg.dflt_timeout );
0773     
0774     NDRX_LOG(log_info, "Session transaction tout set to: [%ld]", 
0775             G_tmqueue_cfg.ses_timeout);
0776     
0777     NDRX_LOG(log_info, "Periodic timeout-check time: [%d]", 
0778             G_tmqueue_cfg.tout_check_time);
0779     
0780     /* we should open the XA  */
0781     NDRX_LOG(log_debug, "About to Open XA Entry!");
0782     
0783     if (EXSUCCEED!=tpopen())
0784     {
0785         EXFAIL_OUT(ret);
0786     }
0787     
0788     /* we shall read the Q space now... */
0789     
0790     /* Recover the messages from disk */
0791     if (EXSUCCEED!=tmq_load_msgs())
0792     {
0793         EXFAIL_OUT(ret);
0794     }
0795     
0796     /* abort all active transactions! */
0797     if (EXSUCCEED!=tmq_log_abortall())
0798     {
0799         EXFAIL_OUT(ret);
0800     }
0801     
0802     /*
0803      * So QSPACE is Service name.
0804      * Each tmqsrv will advertize:
0805      * - <QSPACE> - For Auto queues, you can start multiple executables
0806      *            - For manual queues (doing tpdequeue()) - only one space is possible
0807      *            - processes does the queue mirroring in memory.
0808      * - <QSPACE>-<NODE_ID>-<SRVID> - To this XA driver will send ACKs.
0809      * 
0810      * Also.. when we will recover from disk we will have to ensure the correct order
0811      * of the enqueued messages. We can use time-stamp for doing ordering.
0812      */
0813     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TMQ, G_tmqueue_cfg.vnodeid, tpgetsrvid());
0814     
0815     if (EXSUCCEED!=tpadvertise(svcnm, TMQUEUE))
0816     {
0817         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0818         EXFAIL_OUT(ret);
0819     }
0820 
0821     if (EXSUCCEED!=tpadvertise(ndrx_G_qspacesvc, TMQUEUE))
0822     {
0823         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0824         EXFAIL_OUT(ret);
0825     }
0826     
0827     if (EXSUCCEED!=tmq_fwd_stat_init())
0828     {
0829         NDRX_LOG(log_error, "Failed to init forward statistics");
0830         EXFAIL_OUT(ret);
0831     }
0832     
0833     /* service request handlers */
0834     if (NULL==(G_tmqueue_cfg.thpool = ndrx_thpool_init(G_tmqueue_cfg.threadpoolsize,
0835             NULL, NULL, NULL, 0, NULL)))
0836     {
0837         NDRX_LOG(log_error, "Failed to initialize thread pool (cnt: %d)!", 
0838                 G_tmqueue_cfg.threadpoolsize);
0839         EXFAIL_OUT(ret);
0840     }
0841     
0842     if (NULL==(G_tmqueue_cfg.notifthpool = ndrx_thpool_init(G_tmqueue_cfg.notifpoolsize,
0843             NULL, NULL, NULL, 0, NULL)))
0844     {
0845         NDRX_LOG(log_error, "Failed to initialize udpate thread pool (cnt: %d)!", 
0846                 G_tmqueue_cfg.notifpoolsize);
0847         EXFAIL_OUT(ret);
0848     }
0849     
0850     /* q forward handlers */
0851     if (NULL==(G_tmqueue_cfg.fwdthpool = ndrx_thpool_init(G_tmqueue_cfg.fwdpoolsize,
0852             NULL, NULL, NULL, 0, NULL)))
0853     {
0854         NDRX_LOG(log_error, "Failed to initialize fwd thread pool (cnt: %d)!", 
0855                 G_tmqueue_cfg.fwdpoolsize);
0856         EXFAIL_OUT(ret);
0857     }
0858     
0859     if (NULL==(G_tmqueue_cfg.shutdownseq = ndrx_thpool_init(1,
0860             NULL, NULL, NULL, 0, NULL)))
0861     {
0862         NDRX_LOG(log_error, "Failed to initialize shutdown thread pool!");
0863         EXFAIL_OUT(ret);
0864     }
0865     
0866     /* Register timer check (needed for time-out detection) */
0867     if (EXSUCCEED!=tpext_addperiodcb(G_tmqueue_cfg.tout_check_time, tm_tout_check))
0868     {
0869         NDRX_LOG(log_error, "tpext_addperiodcb failed: %s",
0870                         tpstrerror(tperrno));
0871         EXFAIL_OUT(ret);
0872     }
0873     
0874     /* set shutdown callback handler
0875      * so that we can initiate shutdown sequence
0876      */
0877     if (EXSUCCEED!=ndrx_tpext_addbshutdowncb(shutdowncb))
0878     {
0879         NDRX_LOG(log_error, "Failed to add shutdown sequencer callback!");
0880         EXFAIL_OUT(ret);
0881     }
0882     
0883     /* Start the background processing */
0884     if (EXSUCCEED!=forward_process_init())
0885     {
0886         NDRX_LOG(log_error, "Failed to initialize fwd process thread");
0887         EXFAIL_OUT(ret);
0888     }
0889 
0890     tmq_chkdisk_stopwatch_reset();
0891 
0892     /* Bug #565 */
0893     M_init_ok=EXTRUE;
0894     
0895 out:
0896     return ret;
0897 }
0898 
0899 /**
0900  * Do de-initialization
0901  */
0902 void tpsvrdone(void)
0903 {
0904     int i;
0905     
0906     NDRX_LOG(log_debug, "tpsvrdone called - requesting "
0907             "background thread shutdown...");
0908     
0909     if (M_init_ok)
0910     {
0911         
0912         /* Terminate the threads (request) */
0913         for (i=0; i<G_tmqueue_cfg.threadpoolsize; i++)
0914         {
0915             ndrx_thpool_add_work(G_tmqueue_cfg.thpool, (void *)tmq_thread_shutdown, NULL);
0916         }
0917         
0918         /* update threads */
0919         for (i=0; i<G_tmqueue_cfg.notifpoolsize; i++)
0920         {
0921             ndrx_thpool_add_work(G_tmqueue_cfg.notifthpool, (void *)tmq_thread_shutdown, NULL);
0922         }
0923         
0924         /* terminate the showdown thread... */
0925         ndrx_thpool_add_work(G_tmqueue_cfg.shutdownseq, (void *)tmq_thread_shutdown, NULL);
0926         
0927         
0928         ndrx_thpool_wait(G_tmqueue_cfg.shutdownseq);
0929         ndrx_thpool_destroy(G_tmqueue_cfg.shutdownseq);
0930         
0931         
0932         /* all thread shall be terminated (so that notification is not sent to NULL)
0933          * in case of enqueue...
0934          */
0935         ndrx_thpool_destroy(G_tmqueue_cfg.fwdthpool);
0936         
0937     }
0938     
0939     tpclose();
0940     
0941 }
0942 
0943 /**
0944  * Shutdown the thread
0945  * @param arg
0946  * @param p_finish_off
0947  */
0948 expublic void tmq_thread_shutdown(void *ptr, int *p_finish_off)
0949 {
0950     tmq_thread_uninit();
0951     
0952     *p_finish_off = EXTRUE;
0953 }
0954 /* vim: set ts=4 sw=4 et smartindent: */