Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Background transaction completion & recovery
0003  *   For tlog background scanning with must:
0004  *   - Just lock for writing the hash (while iterate over) & prepare console results
0005  *
0006  * @file background.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 #include <dirent.h>
0043 #include <pthread.h>
0044 #include <signal.h>
0045 
0046 #include <ndebug.h>
0047 #include <atmi.h>
0048 #include <atmi_int.h>
0049 #include <typed_buf.h>
0050 #include <ndrstandard.h>
0051 #include <ubf.h>
0052 #include <Exfields.h>
0053 #include <tperror.h>
0054 #include <exnet.h>
0055 #include <ndrxdcmn.h>
0056 
0057 #include "tmsrv.h"
0058 #include "../libatmisrv/srv_int.h"
0059 #include <xa_cmn.h>
0060 #include <atmi_int.h>
0061 #include <ndrxdiag.h>
0062 /*---------------------------Externs------------------------------------*/
0063 /*---------------------------Macros-------------------------------------*/
0064 #define NDRX_TMS_FILE_STATE_INITIAL     0 /**< Intial file check  */
0065 #define NDRX_TMS_FILE_STATE_IGNORE      1 /**< Broken file, cannot loade (wait for housekeep)  */
0066 /*---------------------------Enums--------------------------------------*/
0067 /*---------------------------Typedefs-----------------------------------*/
0068 /*---------------------------Globals------------------------------------*/
0069 expublic pthread_t G_bacground_thread;
0070 expublic int G_bacground_req_shutdown = EXFALSE;    /* Is shutdown request? */
0071 
0072 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0073 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0074 
0075 exprivate MUTEX_LOCKDECL(M_background_lock); /* Background operations sync        */
0076 
0077 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;    /**< Check disk logs watch */
0078 
0079 exprivate ndrx_tms_file_registry_t *M_broken_tmxids=NULL;
0080 
0081 /*---------------------------Statics------------------------------------*/
0082 /*---------------------------Prototypes---------------------------------*/
0083 
0084 /**
0085  * Check if file is already monitored
0086  * @param tmxid transaction id
0087  * @return NULL or entry
0088  */
0089 expublic ndrx_tms_file_registry_t *ndrx_tms_file_registry_get(const char *tmxid)
0090 {
0091     ndrx_tms_file_registry_t *p_ret = NULL;
0092     
0093     EXHASH_FIND_STR(M_broken_tmxids, tmxid, p_ret);
0094     
0095     return p_ret;
0096 }
0097 
0098 /**
0099  * Add file to registry (just a tmxid)
0100  * @tmxid transaction id
0101  * @state state of the file
0102  * @return EXSUCCEED/EXFAIL
0103  */
0104 expublic int ndrx_tms_file_registry_add(const char *tmxid, int state)
0105 {
0106     int ret = EXSUCCEED;
0107     ndrx_tms_file_registry_t *p_ret = NULL;
0108     
0109     p_ret = ndrx_tms_file_registry_get(tmxid);
0110     
0111     if (NULL==p_ret)
0112     {
0113         p_ret = (ndrx_tms_file_registry_t *)NDRX_FPMALLOC(sizeof(ndrx_tms_file_registry_t), 0);
0114 
0115         if (NULL==p_ret)
0116         {
0117             NDRX_LOG(log_error, "Failed to allocate memory for tmxid: [%s] monitoring", 
0118                     tmxid);
0119             EXFAIL_OUT(ret);
0120         }
0121         
0122         NDRX_STRCPY_SAFE(p_ret->tmxid, tmxid);
0123         p_ret->state = state;
0124 
0125         EXHASH_ADD_STR(M_broken_tmxids, tmxid, p_ret);
0126     }
0127 out:
0128     return ret;
0129 }
0130 
0131 expublic int ndrx_tms_file_registry_del(ndrx_tms_file_registry_t *ent)
0132 {
0133     int ret = EXSUCCEED;
0134     
0135     EXHASH_DEL(M_broken_tmxids, ent);
0136     NDRX_FPFREE(ent);
0137     
0138     return ret;
0139 }
0140 
0141 /**
0142  * Free up the registry
0143  */
0144 expublic void ndrx_tms_file_registry_free(void)
0145 {
0146     ndrx_tms_file_registry_t *el, *tmp;
0147     
0148     EXHASH_ITER(hh, M_broken_tmxids, el, tmp)
0149     {
0150         ndrx_tms_file_registry_del(el);
0151     }
0152 }
0153 
0154 /**
0155  * Lock background operations
0156  */
0157 expublic void background_lock(void)
0158 {
0159     MUTEX_LOCK_V(M_background_lock);
0160 }
0161 
0162 /**
0163  * Un-lock background operations
0164  */
0165 expublic void background_unlock(void)
0166 {
0167     MUTEX_UNLOCK_V(M_background_lock);
0168 }
0169 
0170 /**
0171  * Read the logfiles from the disk (if any we have there...)
0172  * MUST be during the startup, otherwise if front services start to
0173  * work, we might re-parse online logs, and for example we might switch the
0174  * status to ABORTING... if one was preparing...
0175  * @return 
0176  */
0177 expublic int background_read_log(void)
0178 {
0179     int ret=EXSUCCEED;
0180     struct dirent **namelist = NULL;
0181     int n, cnt;
0182     int len;
0183     char tranmask[256];
0184     char fnamefull[PATH_MAX+1];
0185     atmi_xa_log_t *pp_tl = NULL;
0186     
0187     snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid, 
0188             G_atmi_env.xa_rmid, G_server_conf.srv_id);
0189     len = strlen(tranmask);
0190     /* List the files here. */
0191     cnt = scandir(G_tmsrv_cfg.tlog_dir, &namelist, 0, alphasort);
0192     if (cnt < 0)
0193     {
0194        NDRX_LOG(log_error, "Failed to scan [%s]: %s", 
0195                G_tmsrv_cfg.tlog_dir, strerror(errno));
0196        ret=EXFAIL;
0197        goto out;
0198     }
0199     else 
0200     {
0201        for (n=0; n<cnt; n++)
0202        {
0203            if (0==strcmp(namelist[n]->d_name, ".") || 
0204                        0==strcmp(namelist[n]->d_name, ".."))
0205            {
0206                /* memory leak fixes... */
0207                NDRX_FREE(namelist[n]);
0208                continue;
0209            }
0210 
0211            /* If it is transaction then parse & load */
0212            
0213            /*
0214            NDRX_LOG(log_debug, "[%s] vs [%s] %d", 
0215                        namelist[n]->d_name, tranmask, len);
0216            */
0217            
0218            if (0==strncmp(namelist[n]->d_name, tranmask, len))
0219            {
0220                snprintf(fnamefull, sizeof(fnamefull), "%s/%s", G_tmsrv_cfg.tlog_dir, 
0221                        namelist[n]->d_name);
0222                NDRX_LOG(log_warn, "Resuming transaction: [%s]", 
0223                        fnamefull);
0224                
0225                if (EXSUCCEED!=tms_load_logfile(fnamefull, 
0226                        namelist[n]->d_name+len, &pp_tl))
0227                {
0228                    NDRX_LOG(log_error, "Failed to resume transaction: [%s]", 
0229                        fnamefull);
0230                    /* ret=EXFAIL; ??? */
0231 
0232                     /* Do not pick up this anymore... */
0233 
0234                     ret=ndrx_tms_file_registry_add(namelist[n]->d_name+len,
0235                             NDRX_TMS_FILE_STATE_IGNORE);
0236 
0237                     NDRX_FREE(namelist[n]); /* mem leak fixes */
0238 
0239                     if (EXSUCCEED!=ret)
0240                     {
0241                         NDRX_LOG(log_error, "Failed to add tmxid: [%s] to registry (malloc err?)", 
0242                             namelist[n]->d_name+len);
0243                         EXFAIL_OUT(ret);
0244                     }
0245                    continue;
0246                }
0247                
0248            }
0249            
0250            NDRX_FREE(namelist[n]);
0251        }
0252        
0253        NDRX_FREE(namelist);
0254        namelist = NULL;
0255     }
0256     
0257 out:
0258     if (NULL!=namelist)
0259     {
0260        NDRX_FREE(namelist);
0261     }
0262     return ret;
0263 }
0264 
0265 /**
0266  * Sleep the thread, with option to wake up (by shutdown).
0267  * @param sleep_sec
0268  */
0269 exprivate void thread_sleep(int sleep_sec)
0270 {
0271     struct timespec wait_time;
0272     struct timeval now;
0273     int rt;
0274 
0275     gettimeofday(&now,NULL);
0276 
0277     wait_time.tv_sec = now.tv_sec+sleep_sec;
0278     wait_time.tv_nsec = now.tv_usec*1000;
0279 
0280     MUTEX_LOCK_V(M_wait_mutex);
0281     rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0282     MUTEX_UNLOCK_V(M_wait_mutex);
0283 }
0284 
0285 /**
0286  * Wake up the sleeping thread.
0287  */
0288 expublic void background_wakeup(void)
0289 {
0290     MUTEX_LOCK_V(M_wait_mutex);
0291     pthread_cond_signal(&M_wait_cond);
0292     MUTEX_UNLOCK_V(M_wait_mutex);
0293 }
0294 
0295 /**
0296  * Read the logs directory and verify
0297  * that we have all the logs in the memory.
0298  */
0299 expublic int background_chkdisk(void)
0300 {
0301     char tmp[PATH_MAX+1];
0302     int i;
0303     int ret=EXSUCCEED;
0304     DIR *dir=NULL;
0305     struct dirent *entry;
0306     int len;
0307     char tranmask[256];
0308     atmi_xa_log_t *pp_tl = NULL;
0309 
0310     snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid, 
0311             G_atmi_env.xa_rmid, G_server_conf.srv_id);
0312     len = strlen(tranmask);
0313 
0314     dir = opendir(G_tmsrv_cfg.tlog_dir);
0315 
0316     if (dir == NULL) {
0317 
0318         NDRX_LOG(log_error, "opendir [%s] failed: %s", 
0319             G_tmsrv_cfg.tlog_dir, strerror(errno));
0320         EXFAIL_OUT(ret);
0321     }
0322 
0323     while ((entry = readdir(dir)) != NULL)
0324     {
0325         if (0==strncmp(entry->d_name, tranmask, len))
0326         {
0327             /* extract transaction id  */
0328             NDRX_STRCPY_SAFE(tmp, entry->d_name+len);
0329 
0330             if (!tms_log_exists_entry(tmp))
0331             {
0332                 ndrx_tms_file_registry_t *p_reg = NULL;
0333                 snprintf(tmp, sizeof(tmp), "%s/%s", G_tmsrv_cfg.tlog_dir, 
0334                         entry->d_name);
0335                 if (ndrx_file_exists(tmp))
0336                 {
0337                     p_reg=ndrx_tms_file_registry_get(entry->d_name+len);
0338 
0339                     if (NULL==p_reg)
0340                     {
0341                         NDRX_LOG(log_error, "ERROR: Unkown transaction log file "
0342                                 "exists [%s] (duplicate processes?) - enqueue for load", 
0343                             tmp);
0344 
0345                         userlog("ERROR: Unkown transaction log file exists"
0346                                 " [%s] (duplicate processes?) - enqueue for load",
0347                             tmp);
0348 
0349                         if (EXSUCCEED!=ndrx_tms_file_registry_add(entry->d_name+len, 
0350                                 NDRX_TMS_FILE_STATE_INITIAL))
0351                         {
0352                             NDRX_LOG(log_error, "Failed to add tmxid: [%s] to registry (malloc err?)", 
0353                                 entry->d_name+len);
0354                             EXFAIL_OUT(ret);
0355                         }
0356                     }
0357                     else if (NDRX_TMS_FILE_STATE_INITIAL==p_reg->state)
0358                     {
0359                         NDRX_LOG(log_warn, "Loading transaction log [%s]", tmp);
0360                         userlog("Loading transaction log [%s]", tmp);
0361 
0362                         if (EXSUCCEED!=tms_load_logfile(tmp, entry->d_name+len, &pp_tl))
0363                         {
0364                             NDRX_LOG(log_error, "Failed to load transaction log [%s] - ignore log", tmp);
0365                             userlog("Failed to load transaction log [%s] - ignore log", tmp);
0366                             /* change state to ignore */
0367                             p_reg->state = NDRX_TMS_FILE_STATE_IGNORE;
0368                         }
0369                         else
0370                         {
0371                             NDRX_LOG(log_info, "Transaction log [%s] loaded", tmp);
0372                             /* remove from hash */
0373                             ndrx_tms_file_registry_del(p_reg);
0374                         }
0375                     }
0376                 } /* still file exists */
0377             } /* log not found */
0378         } /* mask matched */
0379     }
0380 
0381 out:
0382     if (NULL!=dir)
0383     {
0384         closedir(dir);
0385     }
0386 
0387     return ret;
0388 
0389 }
0390 
0391 /**
0392  * Continues transaction background loop..
0393  * Try to complete the transactions.
0394  * @return  SUCCEED/FAIL
0395  */
0396 expublic int background_loop(void)
0397 {
0398     int ret = EXSUCCEED;
0399     atmi_xa_log_list_t *tx_list;
0400     atmi_xa_log_list_t *el, *tmp;
0401     atmi_xa_tx_info_t xai;
0402     atmi_xa_log_t *p_tl;
0403     
0404     memset(&xai, 0, sizeof(xai));
0405 
0406     ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0407     
0408     while(!G_bacground_req_shutdown)
0409     {
0410         /* run ping... */
0411         if (G_tmsrv_cfg.ping_time > 0)
0412         {
0413             tm_ping_db(NULL, NULL);
0414         }
0415 
0416         /* Check against any logs that we are not aware of
0417          * in that case process reloads (restart) and perform fresh start actions.
0418          * so that we protect us from duplicate runs.
0419          */
0420         if (G_tmsrv_cfg.chkdisk_time > 0 && 
0421                 ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch) > G_tmsrv_cfg.chkdisk_time)
0422         {
0423             /* reset is initiated by the func (if required) */
0424             background_chkdisk();
0425             ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0426         }
0427         
0428         /* Check the list of transactions (iterate over...) 
0429          * Seems anyway, we need a list of background ops here...
0430          */
0431         
0432         /* Lock for processing... (cose xadmin might want to do some stuff in middle
0433          * Might want to think something better (so that it does not lock all process)
0434          */
0435         background_lock();
0436         tx_list = tms_copy_hash2list(COPY_MODE_BACKGROUND | COPY_MODE_ACQLOCK);
0437         
0438         LL_FOREACH_SAFE(tx_list,el,tmp)
0439         {
0440             /* el->p_tl.trycount++; moved to COPY_MODE_INCCOUNTER */
0441             NDRX_LOG(log_info, "XID: [%s] stage: [%hd]. Try: %ld, max: %d.", 
0442                     el->p_tl.tmxid, el->p_tl.txstage, el->p_tl.trycount, 
0443                     G_tmsrv_cfg.max_tries);
0444             
0445             if (el->p_tl.trycount>=G_tmsrv_cfg.max_tries)
0446             {
0447                 NDRX_LOG(log_warn, "Skipping try %ld of %ld...", 
0448                         el->p_tl.trycount,  G_tmsrv_cfg.max_tries);
0449                 /* Have some housekeep. */
0450                 LL_DELETE(tx_list, el);
0451                 NDRX_FREE(el);
0452                 continue;
0453             }
0454             
0455             /* Now try to get transaction for real (with a lock!) */
0456             if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0457             {
0458                 p_tl->trycount++;
0459                 
0460                 NDRX_LOG(log_info, "XID: [%s] try counter increased to: %d",
0461                         el->p_tl.tmxid, p_tl->trycount);
0462                 
0463                 /* run checkpoint here on the log... */
0464                 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
0465                 {
0466                     EXFAIL_OUT(ret);
0467                 }
0468                 XA_TX_COPY((&xai), p_tl);
0469 
0470                 /* If we have transaction in background, then do something with it
0471                  * The master_op does not matter, as we ignore the error code.
0472                  */
0473                 tm_drive(&xai, p_tl, XA_OP_COMMIT, EXFAIL, 0L);
0474             }
0475             else
0476             {
0477                 NDRX_LOG(log_debug, "Transaction locked or already "
0478                         "processed by foreground...");
0479             }
0480             /* Have some housekeep. */
0481             LL_DELETE(tx_list, el);
0482             NDRX_FREE(el);
0483         }
0484         
0485         background_unlock();
0486         NDRX_LOG(log_debug, "background - sleep %d", 
0487                 G_tmsrv_cfg.scan_time);
0488         
0489         if (!G_bacground_req_shutdown)
0490             thread_sleep(G_tmsrv_cfg.scan_time);
0491     }
0492     
0493 out:
0494     return ret;
0495 }
0496 
0497 /**
0498  * Background processing of the transactions (Complete them).
0499  * @return 
0500  */
0501 expublic void * background_process(void *arg)
0502 {
0503     NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
0504     
0505     tm_thread_init();
0506     
0507 /*    background_read_log();*/
0508     
0509     
0510    /* Loop over the transactions and:
0511     * - Check for in-progress timeouts
0512     * - Try to abort abortable
0513     * - Try co commit commitable
0514     * - Use timers counters from the cli params. 
0515     */
0516     
0517     background_loop();
0518     
0519     tm_thread_uninit();
0520     
0521     NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
0522     
0523     return NULL;
0524 }
0525 
0526 /**
0527  * Initialize background process
0528  * @return EXSUCCEED/EXFAIL
0529  */
0530 expublic int background_process_init(void)
0531 {
0532     int ret=EXSUCCEED;
0533     pthread_attr_t pthread_custom_attr;
0534     
0535     /* Read the transaction records from disk
0536      * shall be done before services open
0537      * otherwise we might start to read logs
0538      * from online txns, and if they are preparing,
0539      * we might set here them to aborting..
0540      */ 
0541     if (EXSUCCEED!=background_read_log())
0542     {
0543         NDRX_LOG(log_error, "Failed to recover logs");
0544         userlog("Failed to recover logs");
0545         EXFAIL_OUT(ret);
0546     }
0547     
0548     pthread_attr_init(&pthread_custom_attr);
0549     /* clean up resources after exit.. 
0550     pthread_attr_setdetachstate(&pthread_custom_attr, PTHREAD_CREATE_DETACHED);
0551     */
0552     /* set some small stacks size, 1M should be fine! */
0553     ndrx_platf_stack_set(&pthread_custom_attr);
0554     if (EXSUCCEED!=pthread_create(&G_bacground_thread, &pthread_custom_attr, 
0555             background_process, NULL))
0556     {
0557         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "background_process_init");
0558         EXFAIL_OUT(ret);
0559     }
0560 out:
0561     return ret;
0562       
0563 }
0564 /* vim: set ts=4 sw=4 et smartindent: */