Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Background transaction completion & recovery
0003  *   For tlog background scanning with must:
0004  *   - Just lock for writing the hash (while iterate over) & prepare console results
0005  *
0006  * @file background.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 #include <dirent.h>
0043 #include <pthread.h>
0044 #include <signal.h>
0045 
0046 #include <ndebug.h>
0047 #include <atmi.h>
0048 #include <atmi_int.h>
0049 #include <typed_buf.h>
0050 #include <ndrstandard.h>
0051 #include <ubf.h>
0052 #include <Exfields.h>
0053 #include <tperror.h>
0054 #include <exnet.h>
0055 #include <ndrxdcmn.h>
0056 
0057 #include "tmsrv.h"
0058 #include "../libatmisrv/srv_int.h"
0059 #include <xa_cmn.h>
0060 #include <atmi_int.h>
0061 #include <ndrxdiag.h>
0062 /*---------------------------Externs------------------------------------*/
0063 /*---------------------------Macros-------------------------------------*/
0064 /*---------------------------Enums--------------------------------------*/
0065 /*---------------------------Typedefs-----------------------------------*/
0066 /*---------------------------Globals------------------------------------*/
0067 expublic pthread_t G_bacground_thread;
0068 expublic int G_bacground_req_shutdown = EXFALSE;    /* Is shutdown request? */
0069 
0070 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0071 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0072 
0073 exprivate MUTEX_LOCKDECL(M_background_lock); /* Background operations sync        */
0074 
0075 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;    /**< Check disk logs watch */
0076 
0077 /** Hash of broken tmxids, which will not be loaded */
0078 exprivate ndrx_tms_file_registry_t *M_broken_tmxids=NULL;
0079 
0080 /** Still try to load list of xid, if OS has not yet flushed contents */
0081 exprivate ndrx_tms_file_registry_t *M_attempts_tmxids=NULL;
0082 
0083 /*---------------------------Statics------------------------------------*/
0084 /*---------------------------Prototypes---------------------------------*/
0085 exprivate int background_load_attempts(void);
0086 
0087 /**
0088  * Check if file is already monitored
0089  * @param hash hashmap into which to look
0090  * @param tmxid transaction id
0091  * @return NULL or entry
0092  */
0093 expublic ndrx_tms_file_registry_t *ndrx_tms_file_registry_get(ndrx_tms_file_registry_t ** hash, 
0094     const char *tmxid)
0095 {
0096     ndrx_tms_file_registry_t *p_ret = NULL;
0097     
0098     EXHASH_FIND_STR((*hash), tmxid, p_ret);
0099     
0100     return p_ret;
0101 }
0102 
0103 /**
0104  * Add file to registry (just a tmxid)
0105  * @param hash hashmap to which add entry
0106  * @param tmxid transaction id
0107  * @param housekeepable can we do houskeep on this entry?
0108  * @return EXSUCCEED/EXFAIL
0109  */
0110 expublic int ndrx_tms_file_registry_add(ndrx_tms_file_registry_t ** hash, 
0111     const char *tmxid, int housekeepable)
0112 {
0113     int ret = EXSUCCEED;
0114     ndrx_tms_file_registry_t *p_ret = NULL;
0115     
0116     p_ret = ndrx_tms_file_registry_get(hash, tmxid);
0117     
0118     if (NULL==p_ret)
0119     {
0120         p_ret = (ndrx_tms_file_registry_t *)NDRX_FPMALLOC(sizeof(ndrx_tms_file_registry_t), 0);
0121 
0122         if (NULL==p_ret)
0123         {
0124             NDRX_LOG(log_error, "Failed to allocate memory for tmxid: [%s] monitoring", 
0125                     tmxid);
0126             EXFAIL_OUT(ret);
0127         }
0128         
0129         NDRX_STRCPY_SAFE(p_ret->tmxid, tmxid);
0130         p_ret->housekeepable=housekeepable;
0131         p_ret->attempts=0;
0132         EXHASH_ADD_STR(*hash, tmxid, p_ret);
0133     }
0134 out:
0135     return ret;
0136 }
0137 
0138 expublic int ndrx_tms_file_registry_del(ndrx_tms_file_registry_t ** hash, ndrx_tms_file_registry_t *ent)
0139 {
0140     int ret = EXSUCCEED;
0141     
0142     EXHASH_DEL(*hash, ent);
0143     NDRX_FPFREE(ent);
0144     
0145     return ret;
0146 }
0147 
0148 /**
0149  * Free up the registry
0150  */
0151 expublic void ndrx_tms_file_registry_free(ndrx_tms_file_registry_t ** hash)
0152 {
0153     ndrx_tms_file_registry_t *el, *tmp;
0154     
0155     EXHASH_ITER(hh, (*hash), el, tmp)
0156     {
0157         ndrx_tms_file_registry_del(hash, el);
0158     }
0159 }
0160 
0161 /**
0162  * Lock background operations
0163  */
0164 expublic void background_lock(void)
0165 {
0166     MUTEX_LOCK_V(M_background_lock);
0167 }
0168 
0169 /**
0170  * Un-lock background operations
0171  */
0172 expublic void background_unlock(void)
0173 {
0174     MUTEX_UNLOCK_V(M_background_lock);
0175 }
0176 
0177 /**
0178  * Read the logfiles from the disk (if any we have there...)
0179  * MUST be during the startup, otherwise if front services start to
0180  * work, we might re-parse online logs, and for example we might switch the
0181  * status to ABORTING... if one was preparing...
0182  * @return 
0183  */
0184 expublic int background_read_log(void)
0185 {
0186     int ret=EXSUCCEED;
0187     struct dirent **namelist = NULL;
0188     int n, cnt;
0189     int len;
0190     char tranmask[256];
0191     char fnamefull[PATH_MAX+1];
0192     atmi_xa_log_t *pp_tl = NULL;
0193 
0194     snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid, 
0195             G_atmi_env.xa_rmid, G_server_conf.srv_id);
0196     len = strlen(tranmask);
0197     /* List the files here. */
0198     cnt = scandir(G_tmsrv_cfg.tlog_dir, &namelist, 0, alphasort);
0199     if (cnt < 0)
0200     {
0201        NDRX_LOG(log_error, "Failed to scan [%s]: %s", 
0202                G_tmsrv_cfg.tlog_dir, strerror(errno));
0203        ret=EXFAIL;
0204        goto out;
0205     }
0206     else 
0207     {
0208        for (n=0; n<cnt; n++)
0209        {
0210            if (0==strcmp(namelist[n]->d_name, ".") || 
0211                        0==strcmp(namelist[n]->d_name, ".."))
0212            {
0213                /* memory leak fixes... */
0214                NDRX_FREE(namelist[n]);
0215                continue;
0216            }
0217 
0218            /* If it is transaction then parse & load */
0219             if (0==strncmp(namelist[n]->d_name, tranmask, len))
0220             {
0221                 snprintf(fnamefull, sizeof(fnamefull), "%s/%s", G_tmsrv_cfg.tlog_dir, 
0222                         namelist[n]->d_name);
0223                         
0224                 NDRX_LOG(log_warn, "Resuming transaction: [%s] (enqueue)", 
0225                         fnamefull);
0226 
0227                 if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_attempts_tmxids, 
0228                     namelist[n]->d_name+len, EXFALSE))
0229                 {
0230                     NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to registry (malloc err?)", 
0231                         namelist[n]->d_name+len);
0232                     EXFAIL_OUT(ret);
0233                 }
0234            }
0235            
0236            NDRX_FREE(namelist[n]);
0237        }
0238        
0239        NDRX_FREE(namelist);
0240        namelist = NULL;
0241     }
0242 
0243     /* do the attempts too (i.e. proceed only when logs are loaded... 
0244      * as for normal cases then race with background thread, might
0245      * give chance for tmrecover to remove good transactions
0246      */
0247     if (EXSUCCEED!=background_load_attempts())
0248     {
0249         EXFAIL_OUT(ret);
0250     }
0251     
0252 out:
0253     if (NULL!=namelist)
0254     {
0255        NDRX_FREE(namelist);
0256     }
0257     return ret;
0258 }
0259 
0260 /**
0261  * Sleep the thread, with option to wake up (by shutdown).
0262  * @param sleep_sec
0263  */
0264 exprivate void thread_sleep(int sleep_sec)
0265 {
0266     struct timespec wait_time;
0267     struct timeval now;
0268     int rt;
0269 
0270     gettimeofday(&now,NULL);
0271 
0272     wait_time.tv_sec = now.tv_sec+sleep_sec;
0273     wait_time.tv_nsec = now.tv_usec*1000;
0274 
0275     MUTEX_LOCK_V(M_wait_mutex);
0276     rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0277     MUTEX_UNLOCK_V(M_wait_mutex);
0278 }
0279 
0280 /**
0281  * Wake up the sleeping thread.
0282  */
0283 expublic void background_wakeup(void)
0284 {
0285     MUTEX_LOCK_V(M_wait_mutex);
0286     pthread_cond_signal(&M_wait_cond);
0287     MUTEX_UNLOCK_V(M_wait_mutex);
0288 }
0289 
0290 /**
0291  * Read the logs directory and verify
0292  * that we have all the logs in the memory.
0293  */
0294 expublic int background_chkdisk(void)
0295 {
0296     char tmp[PATH_MAX+1];
0297     int i;
0298     int ret=EXSUCCEED;
0299     DIR *dir=NULL;
0300     struct dirent *entry;
0301     int len;
0302     char tranmask[256];
0303     atmi_xa_log_t *pp_tl = NULL;
0304     char *p_name;
0305 
0306     snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid, 
0307             G_atmi_env.xa_rmid, G_server_conf.srv_id);
0308     len = strlen(tranmask);
0309 
0310     dir = opendir(G_tmsrv_cfg.tlog_dir);
0311 
0312     if (dir == NULL)
0313     {
0314 
0315         NDRX_LOG(log_error, "opendir [%s] failed: %s", 
0316             G_tmsrv_cfg.tlog_dir, strerror(errno));
0317         EXFAIL_OUT(ret);
0318     }
0319 
0320     while ((entry = readdir(dir)) != NULL)
0321     {
0322         if (0==strncmp(entry->d_name, tranmask, len))
0323         {
0324             /* extract transaction id  */
0325             p_name = entry->d_name+len;
0326             NDRX_STRCPY_SAFE(tmp, p_name);
0327 
0328             if (!tms_log_exists_entry(tmp))
0329             {
0330                 ndrx_tms_file_registry_t *p_reg = NULL;
0331 
0332                 snprintf(tmp, sizeof(tmp), "%s/%s", G_tmsrv_cfg.tlog_dir, 
0333                         entry->d_name);
0334 
0335                 if (ndrx_file_exists(tmp))
0336                 {
0337                     if (NULL!=(p_reg=ndrx_tms_file_registry_get(&M_broken_tmxids, p_name)))
0338                     {
0339                         if (tms_housekeep(tmp))
0340                         {
0341                             /* remove file from the M_broken_tmxids */
0342                             ndrx_tms_file_registry_del(&M_broken_tmxids, p_reg);
0343                         }
0344                     }
0345                     else if (NULL==ndrx_tms_file_registry_get(&M_attempts_tmxids, p_name))
0346                     {
0347                         /* enqueue for attempts of loading... */
0348                         if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_attempts_tmxids, p_name,
0349                             EXFALSE))
0350                         {
0351                             NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to "
0352                                 "registry (malloc err?)", p_name);
0353                             EXFAIL_OUT(ret);
0354                         }
0355                     } /* not yet registered */
0356                 }   
0357             } /* log not found */
0358         } /* mask matched */
0359     } /* while readdir() */
0360 
0361 out:
0362     if (NULL!=dir)
0363     {
0364         closedir(dir);
0365     }
0366 
0367     return ret;
0368 
0369 }
0370 
0371 /**
0372  * Iterate over the M_attempts_tmxids and try to load them.
0373  * increment counter for them so that we can move those to failed list
0374  */
0375 exprivate int background_load_attempts(void)
0376 {
0377     int ret = EXSUCCEED;
0378     ndrx_tms_file_registry_t *el, *tmp;
0379     int is_tout;
0380     atmi_xa_log_t *p_tl;
0381     atmi_xa_tx_info_t xai;
0382     char filename[PATH_MAX+1];
0383     int log_removed, housekeepable;
0384     
0385     memset(&xai, 0, sizeof(xai));
0386     
0387     EXHASH_ITER(hh, M_attempts_tmxids, el, tmp)
0388     {
0389         snprintf(filename, sizeof(filename), "%s/TRN-%ld-%hd-%d-%s", G_tmsrv_cfg.tlog_dir,
0390             G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid, G_server_conf.srv_id, el->tmxid);
0391 
0392         /* try to load the transaction */
0393 
0394         /* check if file still exits, if not, just remove from the
0395          * hashmap as no longer need to load it.
0396          * otherwise the a+ would create the file if missing.
0397          * Also remember that files may be removed by houskeeping processes.
0398          */
0399         if (0!=access(filename, 0) && ENOENT==errno)
0400         {
0401             NDRX_LOG(log_debug, "Transaction: [%s] does not exist anymore", 
0402                     el->tmxid);
0403             ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0404             continue;
0405         } /* let other errors to handle standard log loader */
0406         else if (EXSUCCEED==tms_load_logfile(filename, el->tmxid, &p_tl, 
0407             &log_removed, &housekeepable))
0408         {
0409             /* remove from the list */
0410             NDRX_LOG(log_info, "Transaction: [%s] loaded successfully", 
0411                     el->tmxid);
0412             ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0413         }
0414         else if (log_removed)
0415         {
0416             /* remove from active lists */
0417             ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0418         }
0419         else
0420         {
0421             NDRX_LOG(log_error, "Failed to load transaction: [%s]", 
0422                     el->tmxid);
0423             /* increment the counter */
0424             el->attempts++;
0425             if (el->attempts>=G_tmsrv_cfg.logparse_attempts)
0426             {
0427                 NDRX_LOG(log_error, "Transaction: [%s] failed to load after %d tries "
0428                         "- not loading any more", 
0429                         el->tmxid, el->attempts);
0430                 /* 
0431                     * Move to ignore hash, so that we do not attempt to load this 
0432                     * again. However needs to think about housekeeping.
0433                     * However normally the number shall be low.
0434                     */
0435                 if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_broken_tmxids, 
0436                     el->tmxid, housekeepable))
0437                 {
0438                     NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to "
0439                         "registry (malloc err?)", el->tmxid);
0440                     EXFAIL_OUT(ret);
0441                 }
0442 
0443                 ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0444             }
0445         } /* did not remove */
0446     } /* for each active file */
0447 
0448 out:
0449     return ret;
0450 }
0451 
0452 /**
0453  * Continues transaction background loop..
0454  * Try to complete the transactions.
0455  * @return  SUCCEED/FAIL
0456  */
0457 expublic int background_loop(void)
0458 {
0459     int ret = EXSUCCEED;
0460     atmi_xa_log_list_t *tx_list;
0461     atmi_xa_log_list_t *el, *tmp;
0462     atmi_xa_tx_info_t xai;
0463     atmi_xa_log_t *p_tl;
0464     
0465     memset(&xai, 0, sizeof(xai));
0466 
0467     ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0468     
0469     while(!G_bacground_req_shutdown)
0470     {
0471         /* run ping... */
0472         if (G_tmsrv_cfg.ping_time > 0)
0473         {
0474             tm_ping_db(NULL, NULL);
0475         }
0476 
0477         /* Check against any logs that we are not aware of
0478          * in that case process reloads (restart) and perform fresh start actions.
0479          * so that we protect us from duplicate runs.
0480          */
0481         if (G_tmsrv_cfg.chkdisk_time > 0 && 
0482                 ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch) > G_tmsrv_cfg.chkdisk_time)
0483         {
0484             /* reset is initiated by the func (if required) */
0485             background_chkdisk();
0486             ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0487         }
0488         
0489         /* Lock for processing... (cose xadmin might want to do some stuff in middle
0490          * Might want to think something better (so that it does not lock all process)
0491          */
0492         background_lock();
0493         tx_list = tms_copy_hash2list(COPY_MODE_BACKGROUND | COPY_MODE_ACQLOCK);
0494         
0495         LL_FOREACH_SAFE(tx_list,el,tmp)
0496         {
0497             /* el->p_tl.trycount++; moved to COPY_MODE_INCCOUNTER */
0498             NDRX_LOG(log_info, "XID: [%s] stage: [%hd]. Try: %ld, max: %d.", 
0499                     el->p_tl.tmxid, el->p_tl.txstage, el->p_tl.trycount, 
0500                     G_tmsrv_cfg.max_tries);
0501             
0502             if (el->p_tl.trycount>=G_tmsrv_cfg.max_tries)
0503             {
0504                 NDRX_LOG(log_warn, "Skipping try %ld of %ld...", 
0505                         el->p_tl.trycount,  G_tmsrv_cfg.max_tries);
0506                 /* Have some housekeep. */
0507                 LL_DELETE(tx_list, el);
0508                 NDRX_FREE(el);
0509                 continue;
0510             }
0511             
0512             /* Now try to get transaction for real (with a lock!) */
0513             if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0514             {
0515                 p_tl->trycount++;
0516                 
0517                 NDRX_LOG(log_info, "XID: [%s] try counter increased to: %d",
0518                         el->p_tl.tmxid, p_tl->trycount);
0519                 
0520                 /* run checkpoint here on the log... */
0521                 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
0522                 {
0523                     EXFAIL_OUT(ret);
0524                 }
0525                 XA_TX_COPY((&xai), p_tl);
0526 
0527                 /* If we have transaction in background, then do something with it
0528                  * The master_op does not matter, as we ignore the error code.
0529                  */
0530                 tm_drive(&xai, p_tl, XA_OP_COMMIT, EXFAIL, 0L);
0531             }
0532             else
0533             {
0534                 NDRX_LOG(log_debug, "Transaction locked or already "
0535                         "processed by foreground...");
0536             }
0537             /* Have some housekeep. */
0538             LL_DELETE(tx_list, el);
0539             NDRX_FREE(el);
0540         }
0541         
0542         background_unlock();
0543         NDRX_LOG(log_debug, "background - sleep %d", 
0544                 G_tmsrv_cfg.scan_time);
0545         
0546         if (!G_bacground_req_shutdown)
0547             thread_sleep(G_tmsrv_cfg.scan_time);
0548 
0549         /* process any non-loaded logs... 
0550          * (only after the sleep as initial load already did this)
0551          */
0552         background_load_attempts();
0553     }
0554     
0555 out:
0556     return ret;
0557 }
0558 
0559 /**
0560  * Background processing of the transactions (Complete them).
0561  * @return 
0562  */
0563 expublic void * background_process(void *arg)
0564 {
0565     NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
0566     
0567     tm_thread_init();
0568     
0569 /*    background_read_log();*/
0570     
0571     
0572    /* Loop over the transactions and:
0573     * - Check for in-progress timeouts
0574     * - Try to abort abortable
0575     * - Try co commit commitable
0576     * - Use timers counters from the cli params. 
0577     */
0578     
0579     background_loop();
0580     
0581     tm_thread_uninit();
0582     
0583     NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
0584     
0585     return NULL;
0586 }
0587 
0588 /**
0589  * Initialize background process
0590  * @return EXSUCCEED/EXFAIL
0591  */
0592 expublic int background_process_init(void)
0593 {
0594     int ret=EXSUCCEED;
0595     pthread_attr_t pthread_custom_attr;
0596     
0597     /* Read the transaction records from disk
0598      * shall be done before services open
0599      * otherwise we might start to read logs
0600      * from online txns, and if they are preparing,
0601      * we might set here them to aborting..
0602      */ 
0603     if (EXSUCCEED!=background_read_log())
0604     {
0605         NDRX_LOG(log_error, "Failed to recover logs");
0606         userlog("Failed to recover logs");
0607         EXFAIL_OUT(ret);
0608     }
0609     
0610     pthread_attr_init(&pthread_custom_attr);
0611     /* clean up resources after exit.. 
0612     pthread_attr_setdetachstate(&pthread_custom_attr, PTHREAD_CREATE_DETACHED);
0613     */
0614     /* set some small stacks size, 1M should be fine! */
0615     ndrx_platf_stack_set(&pthread_custom_attr);
0616     if (EXSUCCEED!=pthread_create(&G_bacground_thread, &pthread_custom_attr, 
0617             background_process, NULL))
0618     {
0619         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "background_process_init");
0620         EXFAIL_OUT(ret);
0621     }
0622 out:
0623     return ret;
0624       
0625 }
0626 /* vim: set ts=4 sw=4 et smartindent: */