Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Tmsrv server - transaction monitor
0003  *   After that log transaction to hash & to disk for tracking the stuff...
0004  *   TODO: We should have similar control like "TP_COMMIT_CONTROL" -
0005  *   either return after stuff logged or after really commit completed.
0006  *   Error handling:
0007  *   - System errors we will track via ATMI interface error functions
0008  *   - XA errors will be tracked via XA error interface
0009  *   Should we call xa_end for joined transactions? See:
0010  *   https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_7.0.1/com.ibm.mq.amqzag.doc/fa13870_.htm
0011  *   TODO: count the XA_RETRY as part of the transaction retry counter.
0012  *   i.e. if state is not changed counter++
0013  *   have a new flag for max count to return heuristic and move to background.
0014  *
0015  * @file tmsrv.c
0016  */
0017 /* -----------------------------------------------------------------------------
0018  * Enduro/X Middleware Platform for Distributed Transaction Processing
0019  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0020  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0021  * This software is released under one of the following licenses:
0022  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0023  * See LICENSE file for full text.
0024  * -----------------------------------------------------------------------------
0025  * AGPL license:
0026  *
0027  * This program is free software; you can redistribute it and/or modify it under
0028  * the terms of the GNU Affero General Public License, version 3 as published
0029  * by the Free Software Foundation;
0030  *
0031  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0032  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0033  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0034  * for more details.
0035  *
0036  * You should have received a copy of the GNU Affero General Public License along 
0037  * with this program; if not, write to the Free Software Foundation, Inc.,
0038  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0039  *
0040  * -----------------------------------------------------------------------------
0041  * A commercial use license is available from Mavimax, Ltd
0042  * contact@mavimax.com
0043  * -----------------------------------------------------------------------------
0044  */
0045 #include <ndrx_config.h>
0046 #include <stdio.h>
0047 #include <stdlib.h>
0048 #include <string.h>
0049 #include <errno.h>
0050 #include <regex.h>
0051 #include <utlist.h>
0052 #include <unistd.h>
0053 
0054 #include <ndebug.h>
0055 #include <atmi.h>
0056 #include <atmi_int.h>
0057 #include <typed_buf.h>
0058 #include <ndrstandard.h>
0059 #include <ubf.h>
0060 #include <Exfields.h>
0061 
0062 #include <exnet.h>
0063 #include <ndrxdcmn.h>
0064 
0065 #include "tmsrv.h"
0066 #include "../libatmisrv/srv_int.h"
0067 #include "tperror.h"
0068 #include "userlog.h"
0069 #include <xa_cmn.h>
0070 #include <exthpool.h>
0071 #include <ubfutil.h>
0072 #include <sys_test.h>
0073 #include <singlegrp.h>
0074 /*---------------------------Externs------------------------------------*/
0075 /*---------------------------Macros-------------------------------------*/
0076 /*---------------------------Enums--------------------------------------*/
0077 /*---------------------------Typedefs-----------------------------------*/
0078 /*---------------------------Globals------------------------------------*/
0079 expublic tmsrv_cfg_t G_tmsrv_cfg;
0080 /*---------------------------Statics------------------------------------*/
0081 exprivate int M_init_ok = EXFALSE;
0082 
0083 /*
0084  * Thread private data
0085  */
0086 exprivate __thread ndrx_stopwatch_t M_ping_stopwatch;
0087 exprivate __thread int M_thread_first = EXTRUE;
0088 exprivate __thread XID M_ping_xid; /* run pings by this non existent xid */
0089 
0090 /* allow only one timeout check at the same time... */
0091 exprivate int volatile M_into_toutchk = EXFALSE;
0092 exprivate MUTEX_LOCKDECL(M_into_toutchk_lock);
0093 
0094 /*---------------------------Prototypes---------------------------------*/
0095 exprivate int tm_tout_check(void);
0096 
0097 /**
0098  * Initialize thread
0099  */
0100 expublic void tm_thread_init(void)
0101 {
0102     if (EXSUCCEED!=tpinit(NULL))
0103     {
0104         NDRX_LOG(log_error, "Failed to init worker client");
0105         userlog("tmsrv: Failed to init worker client");
0106         exit(1);
0107     }
0108 
0109     /* Bug #161 We shall run xa_open() here too, because it is per thread
0110      * config.
0111      */
0112     if (EXSUCCEED!=tpopen())
0113     {
0114         NDRX_LOG(log_error, "Worker thread failed to tpopen() - nothing to do, "
0115                 "process will exit");
0116         userlog("Worker thread failed to tpopen() - nothing to do, "
0117                 "process will exit");
0118         exit(1);
0119     }
0120 
0121     ndrx_stopwatch_reset(&M_ping_stopwatch);
0122     
0123     atmi_xa_new_xid(&M_ping_xid);
0124     
0125 }
0126 
0127 /**
0128  * Close the thread session
0129  */
0130 expublic void tm_thread_uninit(void)
0131 {
0132     tpclose();
0133     tpterm();
0134 }
0135 
0136 /**
0137  * Tmsrv service entry (working thread)
0138  * @param p_svc - data & len used only...!
0139  */
0140 void TPTMSRV_TH (void *ptr, int *p_finish_off)
0141 {
0142     /* Ok we should not handle the commands 
0143      * TPBEGIN...
0144      */
0145     int ret=EXSUCCEED;
0146     thread_server_t *thread_data = (thread_server_t *)ptr;
0147     char cmd = EXEOS;
0148     int cd;
0149     long allocsz;
0150     /**************************************************************************/
0151     /*                        THREAD CONTEXT RESTORE                          */
0152     /**************************************************************************/
0153     UBFH *p_ub = (UBFH *)thread_data->buffer;
0154     
0155     /* Do the ATMI init, if needed */
0156     if (M_thread_first)
0157     {
0158         tm_thread_init();
0159         M_thread_first = EXFALSE;
0160     }
0161     
0162     /* run the ping (will be skipped if thread is already pinged in time) */
0163     if (G_tmsrv_cfg.ping_time > 0)
0164     {
0165         tm_ping_db(NULL, NULL);
0166     }
0167     
0168     /* restore context. */
0169     if (EXSUCCEED!=tpsrvsetctxdata(thread_data->context_data, SYS_SRV_THREAD))
0170     {
0171         userlog("tmsrv: Failed to set context");
0172         NDRX_LOG(log_error, "Failed to set context");
0173         exit(1);
0174     }
0175     
0176     cd = thread_data->cd;
0177     /* free up the transport data.*/
0178     NDRX_FREE(thread_data->context_data);
0179     NDRX_FREE(thread_data);
0180     /**************************************************************************/
0181     
0182     /* get some more space! 
0183      */
0184     if (Bunused (p_ub) < 4096)
0185     {
0186         p_ub = (UBFH *)tprealloc ((char *)p_ub, allocsz=(Bsizeof (p_ub) + 4096));
0187         
0188         if (NULL==p_ub)
0189         {
0190             NDRX_LOG(log_error, "Failed realloc UBF to %d bytes: %s", 
0191                     allocsz, tpstrerror(tperrno));
0192             EXFAIL_OUT(ret);
0193         }
0194     }
0195     
0196     ndrx_debug_dump_UBF(log_info, "TPTMSRV call buffer:", p_ub);
0197     
0198     if (Bget(p_ub, TMCMD, 0, (char *)&cmd, 0L))
0199     {
0200         NDRX_LOG(log_error, "Failed to read command code!");
0201         ret=EXFAIL;
0202         goto out;
0203     }
0204     
0205     /* TODO: We need TMFLAGS */
0206     
0207     NDRX_LOG(log_info, "Got command code: [%c]", cmd);
0208     
0209     switch(cmd)
0210     {
0211         case ATMI_XA_TPBEGIN:
0212             
0213             /* start new tran... */
0214             if (EXSUCCEED!=tm_tpbegin(p_ub))
0215             {
0216                 ret=EXFAIL;
0217                 goto out;
0218             }
0219             break;
0220         case ATMI_XA_TPCOMMIT:
0221             
0222             if (EXSUCCEED!=tm_tpcommit(p_ub))
0223             {
0224                 ret=EXFAIL;
0225                 goto out;
0226             }
0227             break;
0228         case ATMI_XA_TPABORT:
0229             
0230             if (EXSUCCEED!=tm_tpabort(p_ub))
0231             {
0232                 ret=EXFAIL;
0233                 goto out;
0234             }
0235             break;
0236         case ATMI_XA_PRINTTRANS:
0237             
0238             /* request for printing active transactions 
0239              * we shall allocate the buffer to max possible size
0240              */
0241             
0242             p_ub = (UBFH *)tprealloc ((char *)p_ub, allocsz=
0243                     (NDRX_MSGSIZEMAX-NDRX_MSGSIZEMAX_OVERHD));
0244         
0245             if (NULL==p_ub)
0246             {
0247                 NDRX_LOG(log_error, "Failed realloc UBF to %d bytes: %s", 
0248                         allocsz, tpstrerror(tperrno));
0249                 EXFAIL_OUT(ret);
0250             }
0251 
0252             if (EXSUCCEED!=tm_tpprinttrans(p_ub, cd))
0253             {
0254                 ret=EXFAIL;
0255                 goto out;
0256             }
0257             break;
0258         case ATMI_XA_ABORTTRANS:
0259             
0260             /* request for printing active transactions */
0261             if (EXSUCCEED!=tm_aborttrans(p_ub))
0262             {
0263                 ret=EXFAIL;
0264                 goto out;
0265             }
0266             break;
0267         case ATMI_XA_STATUS:
0268             
0269             /* request for printing active transactions */
0270             if (EXSUCCEED!=tm_status(p_ub))
0271             {
0272                 ret=EXFAIL;
0273                 goto out;
0274             }
0275             break;
0276         case ATMI_XA_COMMITTRANS:
0277             
0278             if (EXSUCCEED!=tm_committrans(p_ub))
0279             {
0280                 ret=EXFAIL;
0281                 goto out;
0282             }
0283             break;
0284         case ATMI_XA_TMPREPARE:
0285             
0286             /* prepare the stuff locally */
0287             if (EXSUCCEED!=tm_tmprepare(p_ub))
0288             {
0289                 ret=EXFAIL;
0290                 goto out;
0291             }
0292             break;
0293         case ATMI_XA_TMCOMMIT:
0294             
0295             /* prepare the stuff locally */
0296             if (EXSUCCEED!=tm_tmcommit(p_ub))
0297             {
0298                 ret=EXFAIL;
0299                 goto out;
0300             }
0301             break;
0302         case ATMI_XA_TMABORT:
0303             
0304             /* abort the stuff locally */
0305             if (EXSUCCEED!=tm_tmabort(p_ub))
0306             {
0307                 ret=EXFAIL;
0308                 goto out;
0309             }
0310             break;
0311         case ATMI_XA_TMFORGET:
0312             /* forget the stuff locally */
0313             if (EXSUCCEED!=tm_tmforget(p_ub))
0314             {
0315                 ret=EXFAIL;
0316                 goto out;
0317             }
0318             break;
0319         case ATMI_XA_TMREGISTER:
0320             /* Some binary is telling as the different RM is involved
0321              * in transaction.
0322              */
0323             if (EXSUCCEED!=tm_tmregister(p_ub))
0324             {
0325                 ret=EXFAIL;
0326                 goto out;
0327             }
0328             break;
0329         case ATMI_XA_RMSTATUS:
0330             /* Report the status of resource manager involvement in transaction
0331              */
0332             if (EXSUCCEED!=tm_rmstatus(p_ub))
0333             {
0334                 ret=EXFAIL;
0335                 goto out;
0336             }
0337             break;
0338         case ATMI_XA_RECOVERLOCAL:
0339             if (EXFAIL==tm_recoverlocal(p_ub, cd))
0340             {
0341                 EXFAIL_OUT(ret);
0342             }
0343             break;
0344         case ATMI_XA_COMMITLOCAL:
0345         case ATMI_XA_ABORTLOCAL:
0346         case ATMI_XA_FORGETLOCAL:
0347             
0348             if (EXSUCCEED!=tm_proclocal(cmd, p_ub, cd))
0349             {
0350                 ret=EXFAIL;
0351                 goto out;
0352             }
0353             
0354             break;
0355         default:
0356             NDRX_LOG(log_error, "Unsupported command code: [%c]", cmd);
0357             ret=EXFAIL;
0358             break;
0359     }
0360     
0361 out:
0362             
0363     /* Approve the request if all ok */
0364     if (EXSUCCEED==ret)
0365     {
0366         atmi_xa_approve(p_ub);
0367     }
0368 
0369     if (EXSUCCEED!=ret && XA_RDONLY==atmi_xa_get_reason())
0370     {
0371         NDRX_LOG(log_debug, "Marking READ ONLY = SUCCEED");
0372         ret=EXSUCCEED;
0373     }
0374 
0375     ndrx_debug_dump_UBF(log_info, "TPTMSRV return buffer:", p_ub);
0376 
0377     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0378                 0L,
0379                 (char *)p_ub,
0380                 0L,
0381                 0L);
0382     
0383     /* note this is thread and it does not do long jump */
0384     /* 
0385      * If there was very long processing session and ping is required (detected
0386      * by func). Then run it here.
0387      */
0388     if (G_tmsrv_cfg.ping_time > 0)
0389     {
0390         tm_ping_db(NULL, NULL);
0391     }
0392 }
0393 
0394 /**
0395  * Entry point for service (main thread)
0396  * @param p_svc
0397  */
0398 void TPTMSRV (TPSVCINFO *p_svc)
0399 {
0400     int ret=EXSUCCEED;
0401     UBFH *p_ub = (UBFH *)p_svc->data; /* this is auto-buffer */
0402     long size;
0403     char btype[16];
0404     char stype[16];
0405     thread_server_t *thread_data = NDRX_MALLOC(sizeof(thread_server_t));
0406     
0407     if (NULL==thread_data)
0408     {
0409         userlog("Failed to malloc memory - %s!", strerror(errno));
0410         NDRX_LOG(log_error, "Failed to malloc memory");
0411         EXFAIL_OUT(ret);
0412     }
0413     
0414     if (0==(size = tptypes (p_svc->data, btype, stype)))
0415     {
0416         NDRX_LOG(log_error, "Zero buffer received!");
0417         userlog("Zero buffer received!");
0418         NDRX_FREE(thread_data);
0419         EXFAIL_OUT(ret);
0420     }
0421     
0422 #if 0
0423         - Why?
0424     /* not using sub-type - on tpreturn/forward for thread it will be auto-free */
0425     thread_data->buffer =  tpalloc(btype, NULL, size);
0426     
0427     if (NULL==thread_data->buffer)
0428     {
0429         NDRX_LOG(log_error, "tpalloc failed of type %s size %ld", btype, size);
0430         EXFAIL_OUT(ret);
0431     }
0432     
0433     /* copy off the data */
0434     memcpy(thread_data->buffer, p_svc->data, size);
0435 #endif
0436     thread_data->buffer=p_svc->data;
0437     thread_data->cd = p_svc->cd;
0438     thread_data->context_data = tpsrvgetctxdata();
0439     
0440     /* submit the job to thread pool: */
0441     ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)TPTMSRV_TH, (void *)thread_data);
0442     
0443 out:
0444     if (EXSUCCEED==ret)
0445     {
0446         ndrx_thpool_wait_one(G_tmsrv_cfg.thpool);
0447         
0448         tpcontinue();
0449     }
0450     else
0451     {
0452         /* return error back */
0453         tpreturn(  TPFAIL,
0454                 0L,
0455                 (char *)p_ub,
0456                 0L,
0457                 0L);
0458     }
0459 
0460 }
0461 
0462 /*
0463  * Do initialization
0464  */
0465 int tpsvrinit(int argc, char **argv)
0466 {
0467     int ret=EXSUCCEED;
0468     signed char c;
0469     char *p;
0470     char svcnm[MAXTIDENT+1];
0471     NDRX_LOG(log_debug, "tpsvrinit called");
0472     
0473     memset(&G_tmsrv_cfg, 0, sizeof(G_tmsrv_cfg));
0474     
0475     G_tmsrv_cfg.ping_mode_jointran = EXTRUE;
0476     G_tmsrv_cfg.housekeeptime = TMSRV_HOUSEKEEP_DEFAULT;
0477     G_tmsrv_cfg.vnodeid=tpgetnodeid();
0478     G_tmsrv_cfg.logparse_attempts=LOGPARSE_ATTEMPTS_DFLT;
0479     
0480     /* Parse command line  */
0481     while ((c = getopt(argc, argv, "n:P:t:s:l:c:m:p:r:Rh:X:a:")) != -1)
0482     {
0483 
0484         if (optarg)
0485         {
0486             NDRX_LOG(log_debug, "%c = [%s]", c, optarg);
0487         }
0488         else
0489         {
0490             NDRX_LOG(log_debug, "got %c", c);
0491         }
0492 
0493         switch(c)
0494         {
0495             case 'a':
0496                 G_tmsrv_cfg.logparse_attempts=atoi(optarg);
0497                 break;
0498             case 'X':
0499                 G_tmsrv_cfg.chkdisk_time=atoi(optarg);
0500 
0501                 if (G_tmsrv_cfg.chkdisk_time)
0502                 {
0503                     NDRX_LOG(log_info, "Check disk logs set to %d sec",
0504                                 G_tmsrv_cfg.chkdisk_time);
0505                 }
0506                 break;
0507             case 'n':
0508                 G_tmsrv_cfg.vnodeid = atol(optarg);
0509                 NDRX_LOG(log_info, "Virtual Enduro/X Cluster Node ID set to %ld",
0510                             G_tmsrv_cfg.vnodeid);
0511                 break;
0512             case 't': 
0513                 G_tmsrv_cfg.dflt_timeout = atol(optarg);
0514                 NDRX_LOG(log_debug, "Default transaction time-out "
0515                             "set to: [%ld]", G_tmsrv_cfg.dflt_timeout);
0516                 break;
0517                 /* status directory: */
0518             case 'l': 
0519                 NDRX_STRCPY_SAFE(G_tmsrv_cfg.tlog_dir, optarg);
0520                 NDRX_LOG(log_debug, "Status directory "
0521                             "set to: [%s]", G_tmsrv_cfg.tlog_dir);
0522                 break;
0523             case 's': 
0524                 G_tmsrv_cfg.scan_time = atoi(optarg);
0525                 break;
0526             case 'c': 
0527                 /* Time for time-out checking... */
0528                 G_tmsrv_cfg.tout_check_time = atoi(optarg);
0529                 break;
0530             case 'm': 
0531                 G_tmsrv_cfg.max_tries = atol(optarg);
0532                 break;
0533             case 'p': 
0534                 G_tmsrv_cfg.threadpoolsize = atol(optarg);
0535                 break;
0536             case 'r': 
0537                 G_tmsrv_cfg.xa_retries = atoi(optarg);
0538                 break;
0539             case 'R':
0540                 /* in this case use tran listing (xa_recover)*/
0541                 G_tmsrv_cfg.ping_mode_jointran = EXFALSE;
0542                 break;
0543             case 'h':
0544                 G_tmsrv_cfg.housekeeptime = atoi(optarg);
0545                 break;
0546             case 'P':
0547                 /* Ping will run with timeout timer interval...
0548                  * will work with RECON flags (which must be set for this case)
0549                  */
0550                 G_tmsrv_cfg.ping_time = atoi(optarg);
0551 
0552                 if (G_tmsrv_cfg.ping_time < 0)
0553                 {
0554                     NDRX_LOG(log_error, "ERROR ! invalid value %d for -P. Must be >=0",
0555                             G_tmsrv_cfg.ping_time);
0556                     EXFAIL_OUT(ret);
0557                 }
0558                 
0559                 break;
0560             default:
0561                 /*return FAIL;*/
0562                 break;
0563         }
0564     }
0565     
0566     /* Check the parameters & default them if needed */
0567     if (0>=G_tmsrv_cfg.scan_time)
0568     {
0569         G_tmsrv_cfg.scan_time = SCAN_TIME_DFLT;
0570     }
0571     
0572     if (0>=G_tmsrv_cfg.max_tries)
0573     {
0574         G_tmsrv_cfg.max_tries = MAX_TRIES_DFTL;
0575     }
0576     
0577     if (0>=G_tmsrv_cfg.tout_check_time)
0578     {
0579         G_tmsrv_cfg.tout_check_time = TOUT_CHECK_TIME;
0580     }
0581     
0582     if (0>=G_tmsrv_cfg.threadpoolsize)
0583     {
0584         G_tmsrv_cfg.threadpoolsize = THREADPOOL_DFLT;
0585     }
0586     
0587     if (0>=G_tmsrv_cfg.xa_retries)
0588     {
0589         G_tmsrv_cfg.xa_retries = XA_RETRIES_DFLT;
0590     }
0591     
0592     if (EXEOS==G_tmsrv_cfg.tlog_dir[0])
0593     {
0594         userlog("TMS log dir not set!");
0595         NDRX_LOG(log_error, "TMS log dir not set!");
0596         EXFAIL_OUT(ret);
0597     }
0598     NDRX_LOG(log_debug, "Recovery scan time set to [%d]",
0599                             G_tmsrv_cfg.scan_time);
0600     
0601     NDRX_LOG(log_debug, "Tx max tries set to [%d]",
0602                             G_tmsrv_cfg.max_tries);
0603     
0604     NDRX_LOG(log_debug, "Worker pool size [%d] threads",
0605                             G_tmsrv_cfg.threadpoolsize);
0606     
0607     NDRX_LOG(log_debug, "Foreground retries in stage [%d]",
0608                             G_tmsrv_cfg.xa_retries);
0609     
0610     NDRX_LOG(log_debug, "Housekeep time for corrupted logs: [%d] (sec)",
0611                             G_tmsrv_cfg.housekeeptime);
0612                             
0613     NDRX_LOG(log_debug, "Log parse attempts: [%d]",
0614                             G_tmsrv_cfg.logparse_attempts); 
0615 
0616     NDRX_LOG(log_debug, "About to initialize XA!");
0617     
0618     if (EXSUCCEED!=atmi_xa_init()) /* will open next... */
0619     {
0620         NDRX_LOG(log_error, "Failed to initialize XA driver!");
0621         EXFAIL_OUT(ret);
0622     }
0623     
0624     if (G_tmsrv_cfg.ping_time > 0)
0625     {
0626 
0627         NDRX_LOG(log_info, "DB PING & connection recovery enabled, interval: %d "
0628                 "(same as -c number)", 
0629                 G_tmsrv_cfg.tout_check_time);
0630         
0631         if (G_tmsrv_cfg.ping_mode_jointran)
0632         {
0633             NDRX_LOG(log_warn, "PING by JOIN to non existent transaction");
0634         }
0635         else
0636         {
0637             NDRX_LOG(log_warn, "PING by RECOVER transaction");
0638         }
0639         
0640         if (G_atmi_env.xa_recon_times <=1)
0641         {
0642             NDRX_LOG(log_always, "ERROR ! Using -P (ping) to be effective, please "
0643                     "ensure that NDRX_XA_FLAGS=RECON... is set to tries count > 1!");
0644             EXFAIL_OUT(ret);
0645         }
0646     }
0647     else
0648     {
0649         NDRX_LOG(log_info, "DB PING disabled (-P not set)");
0650     }
0651 
0652     /* we should open the XA  */
0653     
0654     NDRX_LOG(log_debug, "About to Open XA Entry!");
0655     ret = atmi_xa_open_entry();
0656     if( XA_OK != ret )
0657     {
0658         userlog("xa_open failed error %d", ret);
0659         NDRX_LOG(log_error, "xa_open failed");
0660     }
0661     else
0662     {
0663         NDRX_LOG(log_error, "xa_open ok");
0664         ret = EXSUCCEED;
0665     }
0666                 
0667     /* very generic version/only Resource ID known */
0668     
0669     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_RM, G_atmi_env.xa_rmid);
0670     
0671     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0672     {
0673         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0674         EXFAIL_OUT(ret);
0675     }
0676     
0677     /* generic instance: */
0678     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM, (int)G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid);
0679     
0680     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0681     {
0682         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0683         EXFAIL_OUT(ret);
0684     }
0685     
0686     /* specific instance */
0687     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM_I, (int)G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid, 
0688             G_server_conf.srv_id);
0689     
0690     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0691     {
0692         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0693         EXFAIL_OUT(ret);
0694     }
0695     
0696     if (NULL==(G_tmsrv_cfg.thpool = ndrx_thpool_init(G_tmsrv_cfg.threadpoolsize,
0697             NULL, NULL, NULL, 0, NULL)))
0698     {
0699         NDRX_LOG(log_error, "Failed to initialize thread pool (cnt: %d)!", 
0700                 G_tmsrv_cfg.threadpoolsize);
0701         EXFAIL_OUT(ret);
0702     }
0703     
0704     /* Start the background processing */
0705     if (EXSUCCEED!=background_process_init())
0706     {
0707         NDRX_LOG(log_error, "Failed to creat background txn tout/completion thread");
0708         EXFAIL_OUT(ret);
0709     }
0710     
0711     /* Register timer check (needed for time-out detection) */
0712     if (EXSUCCEED!=tpext_addperiodcb(G_tmsrv_cfg.tout_check_time, tm_tout_check))
0713     {
0714         NDRX_LOG(log_error, "tpext_addperiodcb failed: %s",
0715                         tpstrerror(tperrno));
0716         EXFAIL_OUT(ret);
0717     }
0718     
0719     M_init_ok = EXTRUE;
0720     
0721 out:
0722     return ret;
0723 }
0724 
0725 /**
0726  * Do de-initialization
0727  */
0728 void tpsvrdone(void)
0729 {
0730     int i;
0731     NDRX_LOG(log_debug, "tpsvrdone called - requesting "
0732             "background thread shutdown...");
0733     
0734     G_bacground_req_shutdown = EXTRUE;
0735     
0736     if (M_init_ok)
0737     {
0738         background_wakeup();
0739 
0740         /* Terminate the threads */
0741         for (i=0; i<G_tmsrv_cfg.threadpoolsize; i++)
0742         {
0743             ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void *)tm_thread_shutdown, NULL);
0744         }
0745         
0746         /* Wait to complete */
0747         pthread_join(G_bacground_thread, NULL);
0748 
0749         /* Wait for threads to finish */
0750         ndrx_thpool_wait(G_tmsrv_cfg.thpool);
0751         ndrx_thpool_destroy(G_tmsrv_cfg.thpool);
0752     }
0753     atmi_xa_close_entry(EXFALSE);
0754     
0755 }
0756 
0757 /**
0758  * Periodic main thread callback for 
0759  * (will be done by threadpoll)
0760  * @return 
0761  */
0762 exprivate void tx_tout_check_th(void *ptr)
0763 {
0764     long tspent;
0765     atmi_xa_log_list_t *tx_list;
0766     atmi_xa_log_list_t *el, *tmp;
0767     atmi_xa_tx_info_t xai;
0768     atmi_xa_log_t *p_tl;
0769     int in_progress;
0770     
0771     MUTEX_LOCK_V(M_into_toutchk_lock);
0772     
0773     in_progress=M_into_toutchk;
0774     
0775     /* do lock if was free */
0776     if (!in_progress)
0777     {
0778         M_into_toutchk=EXTRUE;
0779     }
0780             
0781     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0782     
0783     if (in_progress)
0784     {
0785         /* nothing todo... */
0786         goto out;
0787     }
0788     
0789     /* Create a copy of hash, iterate and check each tx for timeout condition
0790      * If so then initiate internal abort call
0791      */
0792     NDRX_LOG(log_dump, "Timeout check (processing...)");
0793     
0794     /* Do the ATMI init, if needed 
0795      */
0796     if (M_thread_first)
0797     {
0798         tm_thread_init();
0799         M_thread_first = EXFALSE;
0800     }
0801     
0802     tx_list = tms_copy_hash2list(COPY_MODE_FOREGROUND | COPY_MODE_ACQLOCK);
0803         
0804     LL_FOREACH_SAFE(tx_list,el,tmp)
0805     {
0806         NDRX_LOG(log_debug, "Checking [%s]...", el->p_tl.tmxid);
0807         if ((tspent = ndrx_stopwatch_get_delta_sec(&el->p_tl.ttimer)) > 
0808                 el->p_tl.txtout && XA_TX_STAGE_ACTIVE==el->p_tl.txstage)
0809         {
0810             
0811             if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0812             {
0813                 if (XA_TX_STAGE_ACTIVE==p_tl->txstage)
0814                 {
0815                     XA_TX_COPY((&xai), p_tl);
0816                     
0817                     NDRX_LOG(log_error, "XID [%s] timed out "
0818                         "(spent %ld, limit: %ld sec) - aborting...!", 
0819                         el->p_tl.tmxid, tspent, 
0820                         el->p_tl.txtout);
0821             
0822                     userlog("XID [%s] timed out "
0823                             "(spent %ld, limit: %ld sec) - aborting...!", 
0824                             el->p_tl.tmxid, tspent, 
0825                             el->p_tl.txtout);
0826 
0827                     tms_log_stage(p_tl, XA_TX_STAGE_ABORTING, EXTRUE);
0828                     
0829                     /* NOTE: We might want to move this to background processing
0830                      * because for example, oracle in some cases does long aborts...
0831                      * thus it slows down general processing
0832                      * BUT: if we want to move it to background, we should protect
0833                      * transaction log from concurrent access, e.g.
0834                      * - background does the abort()
0835                      * - meanwhile foreground calls commit()
0836                      * This can be reached with per transaction locking...
0837                      */
0838                     tm_drive(&xai, p_tl, XA_OP_ROLLBACK, EXFAIL, 0L);
0839                 }
0840                 else
0841                 {
0842                     NDRX_LOG(log_error, "XID [%s] was-tout but found in progress "
0843                         "(txstage %hd spent %ld, limit: %ld sec) - aborting...!", 
0844                         p_tl->tmxid, p_tl->txstage, tspent, p_tl->txtout);
0845                 }
0846             }
0847         }
0848         LL_DELETE(tx_list,el);
0849         NDRX_FREE(el);
0850     }
0851 out:    
0852                 
0853     /* if was not in progress then we locked  */
0854     MUTEX_LOCK_V(M_into_toutchk_lock);
0855 
0856     if (!in_progress)
0857     {
0858         M_into_toutchk=EXFALSE;
0859     }   
0860 
0861     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0862     
0863     return;
0864 }
0865 
0866 /**
0867  * Run the DB ping...
0868  * It will try to list the transactions from DB. Non invasive method.
0869  * The recover entry will automatically reconnect to DB if connection failed
0870  * We will try to join non existent transaction...
0871  */
0872 expublic void tm_ping_db(void *ptr, int *p_finish_off)
0873 {
0874     int delta = ndrx_stopwatch_get_delta_sec(&M_ping_stopwatch);
0875     int ret;
0876     unsigned long tid = (unsigned long)ndrx_gettid();
0877     int is_ping_ok;
0878     
0879     /* Do the ATMI init, if needed */
0880     if (M_thread_first)
0881     {
0882         tm_thread_init();
0883         M_thread_first = EXFALSE;
0884     }
0885     
0886     if (delta >= G_tmsrv_cfg.ping_time)
0887     {
0888         ndrx_stopwatch_reset(&M_ping_stopwatch);
0889         NDRX_LOG(log_debug, "RMID: %hd TID: %lu: Running ping", 
0890                 G_atmi_env.xa_rmid, tid);
0891         
0892         if (G_tmsrv_cfg.ping_mode_jointran)
0893         {
0894             if (EXSUCCEED==(ret = atmi_xa_start_entry(&M_ping_xid, TMJOIN, EXTRUE)) || 
0895                 atmi_xa_get_reason()!=XAER_NOTA)
0896             {
0897                 is_ping_ok = EXFALSE;
0898             }
0899             else
0900             {
0901                 is_ping_ok = EXTRUE;
0902             }
0903         }
0904         else
0905         {
0906             if (0>(ret = atmi_xa_recover_entry(&M_ping_xid, 1, G_atmi_env.xa_rmid, 
0907                 TMSTARTRSCAN|TMENDRSCAN)))
0908             {
0909                 is_ping_ok = EXFALSE;
0910             }
0911             else
0912             {
0913                 is_ping_ok = EXTRUE;
0914             }
0915         }
0916         
0917         if (!is_ping_ok)
0918         {
0919             /* Ping error/ulog */
0920             NDRX_LOG(log_error, "RMID: %hd TID: %lu ERROR ! DB PING FAILED: %s", 
0921                     G_atmi_env.xa_rmid, tid, tpstrerror(tperrno));
0922             userlog("RMID: %hd TID: %lu ERROR ! DB PING FAILED: %s", 
0923                     G_atmi_env.xa_rmid, tid, tpstrerror(tperrno));
0924         }
0925         else
0926         {
0927             /* for tests needs higher debug level to reduce space */
0928             NDRX_LOG(NDRX_SYSTEST_ENBLD?log_error:log_debug,
0929         "RMID %hd TID: %lu: PING OK %d", 
0930                 G_atmi_env.xa_rmid, tid, ret);
0931         }
0932     }
0933 
0934 }
0935 
0936 /**
0937  * Callback routine for scheduled timeout checks.
0938  * @return 
0939  */
0940 exprivate int tm_tout_check(void)
0941 {
0942     int i;
0943     NDRX_LOG(log_dump, "Timeout check (submit job...)");
0944     
0945     ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)tx_tout_check_th, NULL);
0946     
0947     /* RUN PINGs... over the all threads... */
0948     if (G_tmsrv_cfg.ping_time > 0)
0949     {
0950         for (i=0; i<G_tmsrv_cfg.threadpoolsize; i++)
0951         {
0952             ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)tm_ping_db, NULL);
0953         }
0954     }
0955     
0956     return EXSUCCEED;
0957 }
0958 
0959 /**
0960  * Shutdown the thread
0961  * @param arg
0962  * @param p_finish_off
0963  */
0964 expublic void tm_thread_shutdown(void *ptr, int *p_finish_off)
0965 {
0966     tm_thread_uninit();
0967     
0968     *p_finish_off = EXTRUE;
0969 }
0970 
0971 /* vim: set ts=4 sw=4 et smartindent: */