Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Tmsrv server - transaction monitor
0003  *   After that log transaction to hash & to disk for tracking the stuff...
0004  *   TODO: We should have similar control like "TP_COMMIT_CONTROL" -
0005  *   either return after stuff logged or after really commit completed.
0006  *   Error handling:
0007  *   - System errors we will track via ATMI interface error functions
0008  *   - XA errors will be tracked via XA error interface
0009  *   Should we call xa_end for joined transactions? See:
0010  *   https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_7.0.1/com.ibm.mq.amqzag.doc/fa13870_.htm
0011  *   TODO: count the XA_RETRY as part of the transaction retry counter.
0012  *   i.e. if state is not changed counter++
0013  *   have a new flag for max count to return heuristic and move to background.
0014  *
0015  * @file tmsrv.c
0016  */
0017 /* -----------------------------------------------------------------------------
0018  * Enduro/X Middleware Platform for Distributed Transaction Processing
0019  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0020  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0021  * This software is released under one of the following licenses:
0022  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0023  * See LICENSE file for full text.
0024  * -----------------------------------------------------------------------------
0025  * AGPL license:
0026  *
0027  * This program is free software; you can redistribute it and/or modify it under
0028  * the terms of the GNU Affero General Public License, version 3 as published
0029  * by the Free Software Foundation;
0030  *
0031  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0032  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0033  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0034  * for more details.
0035  *
0036  * You should have received a copy of the GNU Affero General Public License along 
0037  * with this program; if not, write to the Free Software Foundation, Inc.,
0038  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0039  *
0040  * -----------------------------------------------------------------------------
0041  * A commercial use license is available from Mavimax, Ltd
0042  * contact@mavimax.com
0043  * -----------------------------------------------------------------------------
0044  */
0045 #include <ndrx_config.h>
0046 #include <stdio.h>
0047 #include <stdlib.h>
0048 #include <string.h>
0049 #include <errno.h>
0050 #include <regex.h>
0051 #include <utlist.h>
0052 #include <unistd.h>
0053 
0054 #include <ndebug.h>
0055 #include <atmi.h>
0056 #include <atmi_int.h>
0057 #include <typed_buf.h>
0058 #include <ndrstandard.h>
0059 #include <ubf.h>
0060 #include <Exfields.h>
0061 
0062 #include <exnet.h>
0063 #include <ndrxdcmn.h>
0064 
0065 #include "tmsrv.h"
0066 #include "../libatmisrv/srv_int.h"
0067 #include "tperror.h"
0068 #include "userlog.h"
0069 #include <xa_cmn.h>
0070 #include <exthpool.h>
0071 #include <ubfutil.h>
0072 #include <sys_test.h>
0073 #include <singlegrp.h>
0074 /*---------------------------Externs------------------------------------*/
0075 /*---------------------------Macros-------------------------------------*/
0076 /*---------------------------Enums--------------------------------------*/
0077 /*---------------------------Typedefs-----------------------------------*/
0078 /*---------------------------Globals------------------------------------*/
0079 expublic tmsrv_cfg_t G_tmsrv_cfg;
0080 /*---------------------------Statics------------------------------------*/
0081 exprivate int M_init_ok = EXFALSE;
0082 
0083 /*
0084  * Thread private data
0085  */
0086 exprivate __thread ndrx_stopwatch_t M_ping_stopwatch;
0087 exprivate __thread int M_thread_first = EXTRUE;
0088 exprivate __thread XID M_ping_xid; /* run pings by this non existent xid */
0089 
0090 /* allow only one timeout check at the same time... */
0091 exprivate int volatile M_into_toutchk = EXFALSE;
0092 exprivate MUTEX_LOCKDECL(M_into_toutchk_lock);
0093 
0094 /*---------------------------Prototypes---------------------------------*/
0095 exprivate int tm_tout_check(void);
0096 
0097 /**
0098  * Initialize thread
0099  */
0100 expublic void tm_thread_init(void)
0101 {
0102     if (EXSUCCEED!=tpinit(NULL))
0103     {
0104         NDRX_LOG(log_error, "Failed to init worker client");
0105         userlog("tmsrv: Failed to init worker client");
0106         exit(1);
0107     }
0108 
0109     /* Bug #161 We shall run xa_open() here too, because it is per thread
0110      * config.
0111      */
0112     if (EXSUCCEED!=tpopen())
0113     {
0114         NDRX_LOG(log_error, "Worker thread failed to tpopen() - nothing to do, "
0115                 "process will exit");
0116         userlog("Worker thread failed to tpopen() - nothing to do, "
0117                 "process will exit");
0118         exit(1);
0119     }
0120 
0121     ndrx_stopwatch_reset(&M_ping_stopwatch);
0122     
0123     atmi_xa_new_xid(&M_ping_xid);
0124     
0125 }
0126 
0127 /**
0128  * Close the thread session
0129  */
0130 expublic void tm_thread_uninit(void)
0131 {
0132     tpclose();
0133     tpterm();
0134 }
0135 
0136 /**
0137  * Tmsrv service entry (working thread)
0138  * @param p_svc - data & len used only...!
0139  */
0140 void TPTMSRV_TH (void *ptr, int *p_finish_off)
0141 {
0142     /* Ok we should not handle the commands 
0143      * TPBEGIN...
0144      */
0145     int ret=EXSUCCEED;
0146     thread_server_t *thread_data = (thread_server_t *)ptr;
0147     char cmd = EXEOS;
0148     int cd;
0149     long allocsz;
0150     /**************************************************************************/
0151     /*                        THREAD CONTEXT RESTORE                          */
0152     /**************************************************************************/
0153     UBFH *p_ub = (UBFH *)thread_data->buffer;
0154     
0155     /* Do the ATMI init, if needed */
0156     if (M_thread_first)
0157     {
0158         tm_thread_init();
0159         M_thread_first = EXFALSE;
0160     }
0161     
0162     /* run the ping (will be skipped if thread is already pinged in time) */
0163     if (G_tmsrv_cfg.ping_time > 0)
0164     {
0165         tm_ping_db(NULL, NULL);
0166     }
0167     
0168     /* restore context. */
0169     if (EXSUCCEED!=tpsrvsetctxdata(thread_data->context_data, SYS_SRV_THREAD))
0170     {
0171         userlog("tmsrv: Failed to set context");
0172         NDRX_LOG(log_error, "Failed to set context");
0173         exit(1);
0174     }
0175     
0176     cd = thread_data->cd;
0177     /* free up the transport data.*/
0178     NDRX_FREE(thread_data->context_data);
0179     NDRX_FREE(thread_data);
0180     /**************************************************************************/
0181     
0182     /* get some more space! 
0183      */
0184     if (Bunused (p_ub) < 4096)
0185     {
0186         p_ub = (UBFH *)tprealloc ((char *)p_ub, allocsz=(Bsizeof (p_ub) + 4096));
0187         
0188         if (NULL==p_ub)
0189         {
0190             NDRX_LOG(log_error, "Failed realloc UBF to %d bytes: %s", 
0191                     allocsz, tpstrerror(tperrno));
0192             EXFAIL_OUT(ret);
0193         }
0194     }
0195     
0196     ndrx_debug_dump_UBF(log_info, "TPTMSRV call buffer:", p_ub);
0197     
0198     if (Bget(p_ub, TMCMD, 0, (char *)&cmd, 0L))
0199     {
0200         NDRX_LOG(log_error, "Failed to read command code!");
0201         ret=EXFAIL;
0202         goto out;
0203     }
0204     
0205     /* TODO: We need TMFLAGS */
0206     
0207     NDRX_LOG(log_info, "Got command code: [%c]", cmd);
0208     
0209     switch(cmd)
0210     {
0211         case ATMI_XA_TPBEGIN:
0212             
0213             /* start new tran... */
0214             if (EXSUCCEED!=tm_tpbegin(p_ub))
0215             {
0216                 ret=EXFAIL;
0217                 goto out;
0218             }
0219             break;
0220         case ATMI_XA_TPCOMMIT:
0221             
0222             if (EXSUCCEED!=tm_tpcommit(p_ub))
0223             {
0224                 ret=EXFAIL;
0225                 goto out;
0226             }
0227             break;
0228         case ATMI_XA_TPABORT:
0229             
0230             if (EXSUCCEED!=tm_tpabort(p_ub))
0231             {
0232                 ret=EXFAIL;
0233                 goto out;
0234             }
0235             break;
0236         case ATMI_XA_PRINTTRANS:
0237             
0238             /* request for printing active transactions 
0239              * we shall allocate the buffer to max possible size
0240              */
0241             
0242             p_ub = (UBFH *)tprealloc ((char *)p_ub, allocsz=
0243                     (NDRX_MSGSIZEMAX-NDRX_MSGSIZEMAX_OVERHD));
0244         
0245             if (NULL==p_ub)
0246             {
0247                 NDRX_LOG(log_error, "Failed realloc UBF to %d bytes: %s", 
0248                         allocsz, tpstrerror(tperrno));
0249                 EXFAIL_OUT(ret);
0250             }
0251 
0252             if (EXSUCCEED!=tm_tpprinttrans(p_ub, cd))
0253             {
0254                 ret=EXFAIL;
0255                 goto out;
0256             }
0257             break;
0258         case ATMI_XA_ABORTTRANS:
0259             
0260             /* request for printing active transactions */
0261             if (EXSUCCEED!=tm_aborttrans(p_ub))
0262             {
0263                 ret=EXFAIL;
0264                 goto out;
0265             }
0266             break;
0267         case ATMI_XA_STATUS:
0268             
0269             /* request for printing active transactions */
0270             if (EXSUCCEED!=tm_status(p_ub))
0271             {
0272                 ret=EXFAIL;
0273                 goto out;
0274             }
0275             break;
0276         case ATMI_XA_COMMITTRANS:
0277             
0278             if (EXSUCCEED!=tm_committrans(p_ub))
0279             {
0280                 ret=EXFAIL;
0281                 goto out;
0282             }
0283             break;
0284         case ATMI_XA_TMPREPARE:
0285             
0286             /* prepare the stuff locally */
0287             if (EXSUCCEED!=tm_tmprepare(p_ub))
0288             {
0289                 ret=EXFAIL;
0290                 goto out;
0291             }
0292             break;
0293         case ATMI_XA_TMCOMMIT:
0294             
0295             /* prepare the stuff locally */
0296             if (EXSUCCEED!=tm_tmcommit(p_ub))
0297             {
0298                 ret=EXFAIL;
0299                 goto out;
0300             }
0301             break;
0302         case ATMI_XA_TMABORT:
0303             
0304             /* abort the stuff locally */
0305             if (EXSUCCEED!=tm_tmabort(p_ub))
0306             {
0307                 ret=EXFAIL;
0308                 goto out;
0309             }
0310             break;
0311         case ATMI_XA_TMFORGET:
0312             /* forget the stuff locally */
0313             if (EXSUCCEED!=tm_tmforget(p_ub))
0314             {
0315                 ret=EXFAIL;
0316                 goto out;
0317             }
0318             break;
0319         case ATMI_XA_TMREGISTER:
0320             /* Some binary is telling as the different RM is involved
0321              * in transaction.
0322              */
0323             if (EXSUCCEED!=tm_tmregister(p_ub))
0324             {
0325                 ret=EXFAIL;
0326                 goto out;
0327             }
0328             break;
0329         case ATMI_XA_RMSTATUS:
0330             /* Report the status of resource manager involvement in transaction
0331              */
0332             if (EXSUCCEED!=tm_rmstatus(p_ub))
0333             {
0334                 ret=EXFAIL;
0335                 goto out;
0336             }
0337             break;
0338         case ATMI_XA_RECOVERLOCAL:
0339             if (EXFAIL==tm_recoverlocal(p_ub, cd))
0340             {
0341                 EXFAIL_OUT(ret);
0342             }
0343             break;
0344         case ATMI_XA_COMMITLOCAL:
0345         case ATMI_XA_ABORTLOCAL:
0346         case ATMI_XA_FORGETLOCAL:
0347             
0348             if (EXSUCCEED!=tm_proclocal(cmd, p_ub, cd))
0349             {
0350                 ret=EXFAIL;
0351                 goto out;
0352             }
0353             
0354             break;
0355         default:
0356             NDRX_LOG(log_error, "Unsupported command code: [%c]", cmd);
0357             ret=EXFAIL;
0358             break;
0359     }
0360     
0361 out:
0362             
0363     /* Approve the request if all ok */
0364     if (EXSUCCEED==ret)
0365     {
0366         atmi_xa_approve(p_ub);
0367     }
0368 
0369     if (EXSUCCEED!=ret && XA_RDONLY==atmi_xa_get_reason())
0370     {
0371         NDRX_LOG(log_debug, "Marking READ ONLY = SUCCEED");
0372         ret=EXSUCCEED;
0373     }
0374 
0375     ndrx_debug_dump_UBF(log_info, "TPTMSRV return buffer:", p_ub);
0376 
0377     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0378                 0L,
0379                 (char *)p_ub,
0380                 0L,
0381                 0L);
0382     
0383     /* note this is thread and it does not do long jump */
0384     /* 
0385      * If there was very long processing session and ping is required (detected
0386      * by func). Then run it here.
0387      */
0388     if (G_tmsrv_cfg.ping_time > 0)
0389     {
0390         tm_ping_db(NULL, NULL);
0391     }
0392 }
0393 
0394 /**
0395  * Entry point for service (main thread)
0396  * @param p_svc
0397  */
0398 void TPTMSRV (TPSVCINFO *p_svc)
0399 {
0400     int ret=EXSUCCEED;
0401     UBFH *p_ub = (UBFH *)p_svc->data; /* this is auto-buffer */
0402     long size;
0403     char btype[16];
0404     char stype[16];
0405     thread_server_t *thread_data = NDRX_MALLOC(sizeof(thread_server_t));
0406     
0407     if (NULL==thread_data)
0408     {
0409         userlog("Failed to malloc memory - %s!", strerror(errno));
0410         NDRX_LOG(log_error, "Failed to malloc memory");
0411         EXFAIL_OUT(ret);
0412     }
0413     
0414     if (0==(size = tptypes (p_svc->data, btype, stype)))
0415     {
0416         NDRX_LOG(log_error, "Zero buffer received!");
0417         userlog("Zero buffer received!");
0418         NDRX_FREE(thread_data);
0419         EXFAIL_OUT(ret);
0420     }
0421     
0422 #if 0
0423         - Why?
0424     /* not using sub-type - on tpreturn/forward for thread it will be auto-free */
0425     thread_data->buffer =  tpalloc(btype, NULL, size);
0426     
0427     if (NULL==thread_data->buffer)
0428     {
0429         NDRX_LOG(log_error, "tpalloc failed of type %s size %ld", btype, size);
0430         EXFAIL_OUT(ret);
0431     }
0432     
0433     /* copy off the data */
0434     memcpy(thread_data->buffer, p_svc->data, size);
0435 #endif
0436     thread_data->buffer=p_svc->data;
0437     thread_data->cd = p_svc->cd;
0438     thread_data->context_data = tpsrvgetctxdata();
0439     
0440     /* submit the job to thread pool: */
0441     ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)TPTMSRV_TH, (void *)thread_data);
0442     
0443 out:
0444     if (EXSUCCEED==ret)
0445     {
0446         ndrx_thpool_wait_one(G_tmsrv_cfg.thpool);
0447         
0448         tpcontinue();
0449     }
0450     else
0451     {
0452         /* return error back */
0453         tpreturn(  TPFAIL,
0454                 0L,
0455                 (char *)p_ub,
0456                 0L,
0457                 0L);
0458     }
0459 
0460 }
0461 
0462 /*
0463  * Do initialization
0464  */
0465 int tpsvrinit(int argc, char **argv)
0466 {
0467     int ret=EXSUCCEED;
0468     signed char c;
0469     char *p;
0470     char svcnm[MAXTIDENT+1];
0471     NDRX_LOG(log_debug, "tpsvrinit called");
0472     
0473     memset(&G_tmsrv_cfg, 0, sizeof(G_tmsrv_cfg));
0474     
0475     G_tmsrv_cfg.ping_mode_jointran = EXTRUE;
0476     G_tmsrv_cfg.housekeeptime = TMSRV_HOUSEKEEP_DEFAULT;
0477     G_tmsrv_cfg.vnodeid=tpgetnodeid();
0478     
0479     /* Parse command line  */
0480     while ((c = getopt(argc, argv, "n:P:t:s:l:c:m:p:r:Rh:X:")) != -1)
0481     {
0482 
0483         if (optarg)
0484         {
0485             NDRX_LOG(log_debug, "%c = [%s]", c, optarg);
0486         }
0487         else
0488         {
0489             NDRX_LOG(log_debug, "got %c", c);
0490         }
0491 
0492         switch(c)
0493         {
0494             case 'X':
0495                 G_tmsrv_cfg.chkdisk_time=atoi(optarg);
0496 
0497                 if (G_tmsrv_cfg.chkdisk_time)
0498                 {
0499                     NDRX_LOG(log_info, "Check disk logs set to %d sec",
0500                                 G_tmsrv_cfg.chkdisk_time);
0501                 }
0502                 break;
0503             case 'n':
0504                 G_tmsrv_cfg.vnodeid = atol(optarg);
0505                 NDRX_LOG(log_info, "Virtual Enduro/X Cluster Node ID set to %ld",
0506                             G_tmsrv_cfg.vnodeid);
0507                 break;
0508             case 't': 
0509                 G_tmsrv_cfg.dflt_timeout = atol(optarg);
0510                 NDRX_LOG(log_debug, "Default transaction time-out "
0511                             "set to: [%ld]", G_tmsrv_cfg.dflt_timeout);
0512                 break;
0513                 /* status directory: */
0514             case 'l': 
0515                 NDRX_STRCPY_SAFE(G_tmsrv_cfg.tlog_dir, optarg);
0516                 NDRX_LOG(log_debug, "Status directory "
0517                             "set to: [%s]", G_tmsrv_cfg.tlog_dir);
0518                 break;
0519             case 's': 
0520                 G_tmsrv_cfg.scan_time = atoi(optarg);
0521                 break;
0522             case 'c': 
0523                 /* Time for time-out checking... */
0524                 G_tmsrv_cfg.tout_check_time = atoi(optarg);
0525                 break;
0526             case 'm': 
0527                 G_tmsrv_cfg.max_tries = atol(optarg);
0528                 break;
0529             case 'p': 
0530                 G_tmsrv_cfg.threadpoolsize = atol(optarg);
0531                 break;
0532             case 'r': 
0533                 G_tmsrv_cfg.xa_retries = atoi(optarg);
0534                 break;
0535             case 'R':
0536                 /* in this case use tran listing (xa_recover)*/
0537                 G_tmsrv_cfg.ping_mode_jointran = EXFALSE;
0538                 break;
0539             case 'h':
0540                 G_tmsrv_cfg.housekeeptime = atoi(optarg);
0541                 break;
0542             case 'P':
0543                 /* Ping will run with timeout timer interval...
0544                  * will work with RECON flags (which must be set for this case)
0545                  */
0546                 G_tmsrv_cfg.ping_time = atoi(optarg);
0547 
0548                 if (G_tmsrv_cfg.ping_time < 0)
0549                 {
0550                     NDRX_LOG(log_error, "ERROR ! invalid value %d for -P. Must be >=0",
0551                             G_tmsrv_cfg.ping_time);
0552                     EXFAIL_OUT(ret);
0553                 }
0554                 
0555                 break;
0556             default:
0557                 /*return FAIL;*/
0558                 break;
0559         }
0560     }
0561     
0562     /* Check the parameters & default them if needed */
0563     if (0>=G_tmsrv_cfg.scan_time)
0564     {
0565         G_tmsrv_cfg.scan_time = SCAN_TIME_DFLT;
0566     }
0567     
0568     if (0>=G_tmsrv_cfg.max_tries)
0569     {
0570         G_tmsrv_cfg.max_tries = MAX_TRIES_DFTL;
0571     }
0572     
0573     if (0>=G_tmsrv_cfg.tout_check_time)
0574     {
0575         G_tmsrv_cfg.tout_check_time = TOUT_CHECK_TIME;
0576     }
0577     
0578     if (0>=G_tmsrv_cfg.threadpoolsize)
0579     {
0580         G_tmsrv_cfg.threadpoolsize = THREADPOOL_DFLT;
0581     }
0582     
0583     if (0>=G_tmsrv_cfg.xa_retries)
0584     {
0585         G_tmsrv_cfg.xa_retries = XA_RETRIES_DFLT;
0586     }
0587     
0588     if (EXEOS==G_tmsrv_cfg.tlog_dir[0])
0589     {
0590         userlog("TMS log dir not set!");
0591         NDRX_LOG(log_error, "TMS log dir not set!");
0592         EXFAIL_OUT(ret);
0593     }
0594     NDRX_LOG(log_debug, "Recovery scan time set to [%d]",
0595                             G_tmsrv_cfg.scan_time);
0596     
0597     NDRX_LOG(log_debug, "Tx max tries set to [%d]",
0598                             G_tmsrv_cfg.max_tries);
0599     
0600     NDRX_LOG(log_debug, "Worker pool size [%d] threads",
0601                             G_tmsrv_cfg.threadpoolsize);
0602     
0603     NDRX_LOG(log_debug, "Foreground retries in stage [%d]",
0604                             G_tmsrv_cfg.xa_retries);
0605     
0606     NDRX_LOG(log_debug, "Housekeep time for corrupted logs: [%d] (sec)",
0607                             G_tmsrv_cfg.housekeeptime);
0608 
0609     NDRX_LOG(log_debug, "About to initialize XA!");
0610     
0611     if (EXSUCCEED!=atmi_xa_init()) /* will open next... */
0612     {
0613         NDRX_LOG(log_error, "Failed to initialize XA driver!");
0614         EXFAIL_OUT(ret);
0615     }
0616     
0617     if (G_tmsrv_cfg.ping_time > 0)
0618     {
0619 
0620         NDRX_LOG(log_info, "DB PING & connection recovery enabled, interval: %d "
0621                 "(same as -c number)", 
0622                 G_tmsrv_cfg.tout_check_time);
0623         
0624         if (G_tmsrv_cfg.ping_mode_jointran)
0625         {
0626             NDRX_LOG(log_warn, "PING by JOIN to non existent transaction");
0627         }
0628         else
0629         {
0630             NDRX_LOG(log_warn, "PING by RECOVER transaction");
0631         }
0632         
0633         if (G_atmi_env.xa_recon_times <=1)
0634         {
0635             NDRX_LOG(log_always, "ERROR ! Using -P (ping) to be effective, please "
0636                     "ensure that NDRX_XA_FLAGS=RECON... is set to tries count > 1!");
0637             EXFAIL_OUT(ret);
0638         }
0639     }
0640     else
0641     {
0642         NDRX_LOG(log_info, "DB PING disabled (-P not set)");
0643     }
0644 
0645     /* we should open the XA  */
0646     
0647     NDRX_LOG(log_debug, "About to Open XA Entry!");
0648     ret = atmi_xa_open_entry();
0649     if( XA_OK != ret )
0650     {
0651         userlog("xa_open failed error %d", ret);
0652         NDRX_LOG(log_error, "xa_open failed");
0653     }
0654     else
0655     {
0656         NDRX_LOG(log_error, "xa_open ok");
0657         ret = EXSUCCEED;
0658     }
0659                 
0660     /* very generic version/only Resource ID known */
0661     
0662     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_RM, G_atmi_env.xa_rmid);
0663     
0664     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0665     {
0666         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0667         EXFAIL_OUT(ret);
0668     }
0669     
0670     /* generic instance: */
0671     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM, (int)G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid);
0672     
0673     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0674     {
0675         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0676         EXFAIL_OUT(ret);
0677     }
0678     
0679     /* specific instance */
0680     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM_I, (int)G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid, 
0681             G_server_conf.srv_id);
0682     
0683     if (EXSUCCEED!=tpadvertise(svcnm, TPTMSRV))
0684     {
0685         NDRX_LOG(log_error, "Failed to advertise %s service!", svcnm);
0686         EXFAIL_OUT(ret);
0687     }
0688     
0689     if (NULL==(G_tmsrv_cfg.thpool = ndrx_thpool_init(G_tmsrv_cfg.threadpoolsize,
0690             NULL, NULL, NULL, 0, NULL)))
0691     {
0692         NDRX_LOG(log_error, "Failed to initialize thread pool (cnt: %d)!", 
0693                 G_tmsrv_cfg.threadpoolsize);
0694         EXFAIL_OUT(ret);
0695     }
0696     
0697     /* Start the background processing */
0698     if (EXSUCCEED!=background_process_init())
0699     {
0700         NDRX_LOG(log_error, "Failed to creat background txn tout/completion thread");
0701         EXFAIL_OUT(ret);
0702     }
0703     
0704     /* Register timer check (needed for time-out detection) */
0705     if (EXSUCCEED!=tpext_addperiodcb(G_tmsrv_cfg.tout_check_time, tm_tout_check))
0706     {
0707         NDRX_LOG(log_error, "tpext_addperiodcb failed: %s",
0708                         tpstrerror(tperrno));
0709         EXFAIL_OUT(ret);
0710     }
0711     
0712     M_init_ok = EXTRUE;
0713     
0714 out:
0715     return ret;
0716 }
0717 
0718 /**
0719  * Do de-initialization
0720  */
0721 void tpsvrdone(void)
0722 {
0723     int i;
0724     NDRX_LOG(log_debug, "tpsvrdone called - requesting "
0725             "background thread shutdown...");
0726     
0727     G_bacground_req_shutdown = EXTRUE;
0728     
0729     if (M_init_ok)
0730     {
0731         background_wakeup();
0732 
0733         /* Terminate the threads */
0734         for (i=0; i<G_tmsrv_cfg.threadpoolsize; i++)
0735         {
0736             ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void *)tm_thread_shutdown, NULL);
0737         }
0738         
0739         /* Wait to complete */
0740         pthread_join(G_bacground_thread, NULL);
0741 
0742         /* Wait for threads to finish */
0743         ndrx_thpool_wait(G_tmsrv_cfg.thpool);
0744         ndrx_thpool_destroy(G_tmsrv_cfg.thpool);
0745     }
0746     atmi_xa_close_entry(EXFALSE);
0747     
0748 }
0749 
0750 /**
0751  * Periodic main thread callback for 
0752  * (will be done by threadpoll)
0753  * @return 
0754  */
0755 exprivate void tx_tout_check_th(void *ptr)
0756 {
0757     long tspent;
0758     atmi_xa_log_list_t *tx_list;
0759     atmi_xa_log_list_t *el, *tmp;
0760     atmi_xa_tx_info_t xai;
0761     atmi_xa_log_t *p_tl;
0762     int in_progress;
0763     
0764     MUTEX_LOCK_V(M_into_toutchk_lock);
0765     
0766     in_progress=M_into_toutchk;
0767     
0768     /* do lock if was free */
0769     if (!in_progress)
0770     {
0771         M_into_toutchk=EXTRUE;
0772     }
0773             
0774     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0775     
0776     if (in_progress)
0777     {
0778         /* nothing todo... */
0779         goto out;
0780     }
0781     
0782     /* Create a copy of hash, iterate and check each tx for timeout condition
0783      * If so then initiate internal abort call
0784      */
0785     NDRX_LOG(log_dump, "Timeout check (processing...)");
0786     
0787     /* Do the ATMI init, if needed 
0788      */
0789     if (M_thread_first)
0790     {
0791         tm_thread_init();
0792         M_thread_first = EXFALSE;
0793     }
0794     
0795     tx_list = tms_copy_hash2list(COPY_MODE_FOREGROUND | COPY_MODE_ACQLOCK);
0796         
0797     LL_FOREACH_SAFE(tx_list,el,tmp)
0798     {
0799         NDRX_LOG(log_debug, "Checking [%s]...", el->p_tl.tmxid);
0800         if ((tspent = ndrx_stopwatch_get_delta_sec(&el->p_tl.ttimer)) > 
0801                 el->p_tl.txtout && XA_TX_STAGE_ACTIVE==el->p_tl.txstage)
0802         {
0803             
0804             if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0805             {
0806                 if (XA_TX_STAGE_ACTIVE==p_tl->txstage)
0807                 {
0808                     XA_TX_COPY((&xai), p_tl);
0809                     
0810                     NDRX_LOG(log_error, "XID [%s] timed out "
0811                         "(spent %ld, limit: %ld sec) - aborting...!", 
0812                         el->p_tl.tmxid, tspent, 
0813                         el->p_tl.txtout);
0814             
0815                     userlog("XID [%s] timed out "
0816                             "(spent %ld, limit: %ld sec) - aborting...!", 
0817                             el->p_tl.tmxid, tspent, 
0818                             el->p_tl.txtout);
0819 
0820                     tms_log_stage(p_tl, XA_TX_STAGE_ABORTING, EXTRUE);
0821                     
0822                     /* NOTE: We might want to move this to background processing
0823                      * because for example, oracle in some cases does long aborts...
0824                      * thus it slows down general processing
0825                      * BUT: if we want to move it to background, we should protect
0826                      * transaction log from concurrent access, e.g.
0827                      * - background does the abort()
0828                      * - meanwhile foreground calls commit()
0829                      * This can be reached with per transaction locking...
0830                      */
0831                     tm_drive(&xai, p_tl, XA_OP_ROLLBACK, EXFAIL, 0L);
0832                 }
0833                 else
0834                 {
0835                     NDRX_LOG(log_error, "XID [%s] was-tout but found in progress "
0836                         "(txstage %hd spent %ld, limit: %ld sec) - aborting...!", 
0837                         p_tl->tmxid, p_tl->txstage, tspent, p_tl->txtout);
0838                 }
0839             }
0840         }
0841         LL_DELETE(tx_list,el);
0842         NDRX_FREE(el);
0843     }
0844 out:    
0845                 
0846     /* if was not in progress then we locked  */
0847     MUTEX_LOCK_V(M_into_toutchk_lock);
0848 
0849     if (!in_progress)
0850     {
0851         M_into_toutchk=EXFALSE;
0852     }   
0853 
0854     MUTEX_UNLOCK_V(M_into_toutchk_lock);
0855     
0856     return;
0857 }
0858 
0859 /**
0860  * Run the DB ping...
0861  * It will try to list the transactions from DB. Non invasive method.
0862  * The recover entry will automatically reconnect to DB if connection failed
0863  * We will try to join non existent transaction...
0864  */
0865 expublic void tm_ping_db(void *ptr, int *p_finish_off)
0866 {
0867     int delta = ndrx_stopwatch_get_delta_sec(&M_ping_stopwatch);
0868     int ret;
0869     unsigned long tid = (unsigned long)ndrx_gettid();
0870     int is_ping_ok;
0871     
0872     /* Do the ATMI init, if needed */
0873     if (M_thread_first)
0874     {
0875         tm_thread_init();
0876         M_thread_first = EXFALSE;
0877     }
0878     
0879     if (delta >= G_tmsrv_cfg.ping_time)
0880     {
0881         ndrx_stopwatch_reset(&M_ping_stopwatch);
0882         NDRX_LOG(log_debug, "RMID: %hd TID: %lu: Running ping", 
0883                 G_atmi_env.xa_rmid, tid);
0884         
0885         if (G_tmsrv_cfg.ping_mode_jointran)
0886         {
0887             if (EXSUCCEED==(ret = atmi_xa_start_entry(&M_ping_xid, TMJOIN, EXTRUE)) || 
0888                 atmi_xa_get_reason()!=XAER_NOTA)
0889             {
0890                 is_ping_ok = EXFALSE;
0891             }
0892             else
0893             {
0894                 is_ping_ok = EXTRUE;
0895             }
0896         }
0897         else
0898         {
0899             if (0>(ret = atmi_xa_recover_entry(&M_ping_xid, 1, G_atmi_env.xa_rmid, 
0900                 TMSTARTRSCAN|TMENDRSCAN)))
0901             {
0902                 is_ping_ok = EXFALSE;
0903             }
0904             else
0905             {
0906                 is_ping_ok = EXTRUE;
0907             }
0908         }
0909         
0910         if (!is_ping_ok)
0911         {
0912             /* Ping error/ulog */
0913             NDRX_LOG(log_error, "RMID: %hd TID: %lu ERROR ! DB PING FAILED: %s", 
0914                     G_atmi_env.xa_rmid, tid, tpstrerror(tperrno));
0915             userlog("RMID: %hd TID: %lu ERROR ! DB PING FAILED: %s", 
0916                     G_atmi_env.xa_rmid, tid, tpstrerror(tperrno));
0917         }
0918         else
0919         {
0920             /* for tests needs higher debug level to reduce space */
0921             NDRX_LOG(NDRX_SYSTEST_ENBLD?log_error:log_debug,
0922         "RMID %hd TID: %lu: PING OK %d", 
0923                 G_atmi_env.xa_rmid, tid, ret);
0924         }
0925     }
0926 
0927 }
0928 
0929 /**
0930  * Callback routine for scheduled timeout checks.
0931  * @return 
0932  */
0933 exprivate int tm_tout_check(void)
0934 {
0935     int i;
0936     NDRX_LOG(log_dump, "Timeout check (submit job...)");
0937     
0938     ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)tx_tout_check_th, NULL);
0939     
0940     /* RUN PINGs... over the all threads... */
0941     if (G_tmsrv_cfg.ping_time > 0)
0942     {
0943         for (i=0; i<G_tmsrv_cfg.threadpoolsize; i++)
0944         {
0945             ndrx_thpool_add_work(G_tmsrv_cfg.thpool, (void*)tm_ping_db, NULL);
0946         }
0947     }
0948     
0949     return EXSUCCEED;
0950 }
0951 
0952 /**
0953  * Shutdown the thread
0954  * @param arg
0955  * @param p_finish_off
0956  */
0957 expublic void tm_thread_shutdown(void *ptr, int *p_finish_off)
0958 {
0959     tm_thread_uninit();
0960     
0961     *p_finish_off = EXTRUE;
0962 }
0963 
0964 /* vim: set ts=4 sw=4 et smartindent: */