Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief tmsrv - transaction logging & accounting
0003  *   for systems which are not support TMJOIN, we shall create additional
0004  *   XIDs for each of the involved process session. These XIDs shall be used
0005  *   as "sub-xids". There will be Master XID involved in process and
0006  *   sub-xids will be logged in tmsrv and will be known by processes locally
0007  *   Thus p_tl->rmstatus needs to be extended with hash list of the local
0008  *   transaction ids. We shall keep the structure universal, and use
0009  *   sub-xids even TMJOIN is supported.
0010  *
0011  * @file log.c
0012  */
0013 /* -----------------------------------------------------------------------------
0014  * Enduro/X Middleware Platform for Distributed Transaction Processing
0015  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0016  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0017  * This software is released under one of the following licenses:
0018  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0019  * See LICENSE file for full text.
0020  * -----------------------------------------------------------------------------
0021  * AGPL license:
0022  *
0023  * This program is free software; you can redistribute it and/or modify it under
0024  * the terms of the GNU Affero General Public License, version 3 as published
0025  * by the Free Software Foundation;
0026  *
0027  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0028  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0029  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0030  * for more details.
0031  *
0032  * You should have received a copy of the GNU Affero General Public License along 
0033  * with this program; if not, write to the Free Software Foundation, Inc.,
0034  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0035  *
0036  * -----------------------------------------------------------------------------
0037  * A commercial use license is available from Mavimax, Ltd
0038  * contact@mavimax.com
0039  * -----------------------------------------------------------------------------
0040  */
0041 #include <stdio.h>
0042 #include <stdlib.h>
0043 #include <string.h>
0044 #include <errno.h>
0045 #include <regex.h>
0046 #include <utlist.h>
0047 #include <stdarg.h>
0048 
0049 #include <ndebug.h>
0050 #include <atmi.h>
0051 #include <atmi_int.h>
0052 #include <typed_buf.h>
0053 #include <ndrstandard.h>
0054 #include <ubf.h>
0055 #include <Exfields.h>
0056 #include <nstdutil.h>
0057 
0058 #include <exnet.h>
0059 #include <ndrxdcmn.h>
0060 
0061 #include "tmsrv.h"
0062 #include "../libatmisrv/srv_int.h"
0063 #include "userlog.h"
0064 #include <xa_cmn.h>
0065 #include <exhash.h>
0066 #include <unistd.h>
0067 #include <Exfields.h>
0068 #include <singlegrp.h>
0069 #include <sys_test.h>
0070 /*---------------------------Externs------------------------------------*/
0071 /*---------------------------Macros-------------------------------------*/
0072 #define LOG_MAX         1024
0073 
0074 #define LOG_COMMAND_STAGE           'S' /**< Identify stage of txn           */
0075 #define LOG_COMMAND_I               'I' /**< Info about txn                  */
0076 #define LOG_COMMAND_J               'J' /**< Version 2 format with checksums */
0077 #define LOG_COMMAND_RMSTAT          'R' /**< Log the RM status               */
0078 
0079 #define LOG_VERSION_1                1   /**< Initial version                  */
0080 #define LOG_VERSION_2                2   /**< Version 1, contains crc32 checks */
0081 
0082 #define CHK_THREAD_ACCESS if (ndrx_gettid()!=p_tl->lockthreadid)\
0083     {\
0084         NDRX_LOG(log_error, "Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0085                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0086         userlog("Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0087                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0088         return EXFAIL;\
0089     }
0090 #define LOG_RS_SEP  ';'            /**< record seperator               */
0091 /*---------------------------Enums--------------------------------------*/
0092 /*---------------------------Typedefs-----------------------------------*/
0093 /*---------------------------Globals------------------------------------*/
0094 exprivate atmi_xa_log_t *M_tx_hash = NULL; 
0095 exprivate MUTEX_LOCKDECL(M_tx_hash_lock); /* Operations with hash list            */
0096 /*---------------------------Statics------------------------------------*/
0097 /*---------------------------Prototypes---------------------------------*/
0098 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...);
0099 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl);
0100 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl);
0101 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl);
0102 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid);
0103 
0104 /**
0105  * Unlock transaction
0106  * @param p_tl
0107  * @return SUCCEED/FAIL
0108  */
0109 expublic int tms_unlock_entry(atmi_xa_log_t *p_tl)
0110 {
0111     CHK_THREAD_ACCESS;
0112     
0113     NDRX_LOG(log_info, "Transaction [%s] unlocked by thread %" PRIu64, p_tl->tmxid,
0114             p_tl->lockthreadid);
0115     
0116     MUTEX_LOCK_V(M_tx_hash_lock);
0117     p_tl->lockthreadid = 0;
0118     MUTEX_UNLOCK_V(M_tx_hash_lock);
0119     
0120     return EXSUCCEED;
0121 }
0122 
0123 /**
0124  * Check that log file actually exists on the disk.
0125  * @param tmxid transaction
0126  * @return EXTRUE/EXFALSE/EXFAIL
0127  */
0128 expublic int tms_log_exists_file(char *tmxid)
0129 {
0130     char fname[PATH_MAX+1];
0131     int err=0;
0132 
0133     tms_gen_file_name(fname, sizeof(fname), tmxid);
0134 
0135     if (EXSUCCEED == access(fname, 0))
0136     {
0137         return EXTRUE;
0138     }
0139 
0140     err=errno;
0141 
0142     if (ENOENT==err)
0143     {
0144         return EXFALSE;
0145     }
0146 
0147     NDRX_LOG(log_error, "Failed to check file [%s] presence: %s",
0148         fname, strerror(err));
0149     userlog("Failed to check file [%s] presence: %s",
0150         fname, strerror(err));
0151 
0152     return EXFAIL;
0153 }
0154 
0155 /**
0156  * Check that memory based log entry exists.
0157  * @param tmxid - serialized XID
0158  * @return NULL or log entry
0159  */
0160 expublic int tms_log_exists_entry(char *tmxid)
0161 {
0162     atmi_xa_log_t *r = NULL;
0163     
0164     MUTEX_LOCK_V(M_tx_hash_lock);
0165     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0166     MUTEX_UNLOCK_V(M_tx_hash_lock);
0167     
0168     if (NULL!=r)
0169     {
0170         return EXTRUE;
0171     }
0172     else
0173     {
0174         return EXFALSE;
0175     }
0176 }
0177 
0178 /**
0179  * Get the log entry of the transaction
0180  * Now we should lock it for thread.
0181  * TODO: Add option for wait on lock.
0182  * This would be needed for cases for example when Postgres may report
0183  * the status in background, and for some reason TMSRV already processes the
0184  * transaction (ether abort, or new register branch, etc...) and some stalled
0185  * PG process wants to finish the work off. Thus we need to waited lock for
0186  * foreground operations.
0187  * TODO: In future release lock could be based on real mutex within the
0188  * transaction object, so that we do not have to employ the sleep
0189  * @param tmxid - serialized XID
0190  * @param[in] dowait milliseconds to wait for lock, before give up
0191  * @param[out] locke lock error
0192  * @return NULL or log entry
0193  */
0194 expublic atmi_xa_log_t * tms_log_get_entry(char *tmxid, int dowait, int *locke)
0195 {
0196     atmi_xa_log_t *r = NULL;
0197     ndrx_stopwatch_t w;
0198     
0199     if (dowait)
0200     {
0201         ndrx_stopwatch_reset(&w);
0202     }
0203     
0204     if (NULL!=locke)
0205     {
0206         *locke=EXFALSE;
0207     }
0208     
0209 restart:
0210     MUTEX_LOCK_V(M_tx_hash_lock);
0211     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0212     
0213     if (NULL!=r)
0214     {
0215         if (r->lockthreadid)
0216         {
0217             if (dowait && ndrx_stopwatch_get_delta(&w) < dowait)
0218             {
0219                 MUTEX_UNLOCK_V(M_tx_hash_lock);
0220                 /* sleep 100 msec */
0221                 usleep(100000);
0222                 goto restart;
0223                 
0224             }
0225             
0226             NDRX_LOG(log_error, "Transaction [%s] already locked for thread_id: %"
0227                     PRIu64 " lock time: %d msec",
0228                     tmxid, r->lockthreadid, dowait);
0229             
0230             userlog("tmsrv: Transaction [%s] already locked for thread_id: %" PRIu64
0231                     "lock time: %d msec",
0232                     tmxid, r->lockthreadid, dowait);
0233             r = NULL;
0234             
0235             /* cannot get lock */
0236             if (NULL!=locke)
0237             {
0238                 *locke=EXTRUE;
0239             }
0240             
0241         }
0242         else
0243         {
0244             r->lockthreadid = ndrx_gettid();
0245             NDRX_LOG(log_info, "Transaction [%s] locked for thread_id: %" PRIu64,
0246                     tmxid, r->lockthreadid);
0247         }
0248     }
0249     
0250     MUTEX_UNLOCK_V(M_tx_hash_lock);
0251     
0252     return r;
0253 }
0254 
0255 /**
0256  * set current lock sequence number
0257  * @param p_tl log entry
0258  */
0259 exprivate int tms_log_setseq(atmi_xa_log_t *p_tl)
0260 {
0261     int ret= EXSUCCEED;
0262     long grp_flags=0;
0263     /* if we are in singleton group mode, validate that we still
0264      * own the lock
0265      */
0266     if (G_atmi_env.procgrp_no)
0267     {
0268         p_tl->sg_sequence=tpsgislocked(G_atmi_env.procgrp_no
0269             , TPPG_SGVERIFY|TPPG_NONSGSUCC
0270             , &grp_flags);
0271 
0272         if (EXFAIL==p_tl->sg_sequence)
0273         {
0274             NDRX_LOG(log_error, "tpsgislocked failed %s", tpstrerror(tperrno));
0275             EXFAIL_OUT(ret);
0276         }
0277         
0278         if (grp_flags & TPPG_SINGLETON && p_tl->sg_sequence<=0)
0279         {
0280             NDRX_LOG(log_error, "Singleton group %d lock lost (at start) - exit(-1)",
0281                 G_atmi_env.procgrp_no);
0282             userlog("Singleton group %d lock lost (at start) - exit(-1)",
0283                 G_atmi_env.procgrp_no);
0284             /* !!!! */
0285             exit(EXFAIL);
0286         }
0287     }
0288 
0289 out:
0290     return ret;
0291 }
0292 
0293 /**
0294  * run checkpoint on transaction
0295  * @param p_tl transaction log
0296  */
0297 expublic int tms_log_checkpointseq(atmi_xa_log_t *p_tl)
0298 {
0299     int ret=EXSUCCEED;
0300     long seq;
0301     long grp_flags=0;
0302     /* if we are in singleton group mode, validate that we still
0303      * own the lock
0304      */
0305     if (G_atmi_env.procgrp_no)
0306     {
0307         seq=tpsgislocked(G_atmi_env.procgrp_no, TPPG_SGVERIFY|TPPG_NONSGSUCC, &grp_flags);
0308 
0309         if (seq < 0)
0310         {
0311             NDRX_LOG(log_error, "tpsgislocked returns %s", tpstrerror(tperrno));
0312             EXFAIL_OUT(ret);
0313         }
0314 
0315         if ((grp_flags & TPPG_SINGLETON) && 0==seq)
0316         {
0317             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost - exit(-1)",
0318                 G_atmi_env.procgrp_no, tpgetnodeid());
0319             userlog("Singleton group %d on node %ld lock lost - exit(-1)",
0320                 G_atmi_env.procgrp_no, tpgetnodeid());
0321             /* !!!! */
0322             exit(EXFAIL);
0323         }
0324 
0325         /* allow simple lock grou pbased checks */
0326         if (NULL!=p_tl)
0327         {
0328             /* failover has happened during transaction processing
0329             * thus cannot proceed with decsion, only after restart
0330             */
0331             if ( (grp_flags & TPPG_SINGLETON) && (seq - p_tl->sg_sequence >= G_atmi_env.sglockinc))
0332             {
0333                 NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0334                     G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0335                 userlog("Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0336                     G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0337                 /* !!!! */
0338                 exit(EXFAIL);
0339             }
0340 
0341             /* we are safe to continue... */
0342             p_tl->sg_sequence=seq;
0343         }
0344     }
0345 out:
0346     return ret;
0347 }
0348 
0349 /**
0350  * Log transaction as started
0351  * @param xai - XA Info struct
0352  * @param txtout - transaction timeout
0353  * @param[out] btid branch transaction id (if not dynamic reg)
0354  * @return SUCCEED/FAIL
0355  */
0356 expublic int tms_log_start(atmi_xa_tx_info_t *xai, int txtout, long tmflags,
0357         long *btid)
0358 {
0359     int ret = EXSUCCEED;
0360     int hash_added = EXFALSE;
0361     atmi_xa_log_t *tmp = NULL;
0362     atmi_xa_rm_status_btid_t *bt;
0363     /* 1. Add stuff to hash list */
0364     if (NULL==(tmp = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0365     {
0366         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0367         ret=EXFAIL;
0368         goto out;
0369     }
0370     tmp->txstage = XA_TX_STAGE_ACTIVE;
0371     
0372     tmp->tmnodeid = xai->tmnodeid;
0373     tmp->tmsrvid = xai->tmsrvid;
0374     tmp->tmrmid = xai->tmrmid;
0375     NDRX_STRCPY_SAFE(tmp->tmxid, xai->tmxid);
0376     
0377     tmp->t_start = ndrx_utc_tstamp();
0378     tmp->t_update = ndrx_utc_tstamp();
0379     tmp->txtout = txtout;
0380     tmp->log_version = LOG_VERSION_2;   /**< Now with CRC32 groups */
0381     ndrx_stopwatch_reset(&tmp->ttimer);
0382 
0383     if (EXSUCCEED!=tms_log_setseq(tmp))
0384     {
0385         EXFAIL_OUT(ret);
0386     }
0387 
0388     /* lock for us, yet it is not shared*/
0389     tmp->lockthreadid = ndrx_gettid();
0390 
0391     /* well... I guess firstly we shall
0392      * add transaction to hashmap, and only then open the file
0393      * otherwise we can for file, they are found and then
0394      * we request to remove the files...
0395      */
0396     MUTEX_LOCK_V(M_tx_hash_lock);
0397     EXHASH_ADD_STR( M_tx_hash, tmxid, tmp);
0398     MUTEX_UNLOCK_V(M_tx_hash_lock);
0399     hash_added = EXTRUE;
0400     
0401     /* TODO: Open the log file & write tms_close_logfile
0402      * Only question, how long 
0403      */
0404     
0405     /* Open log file */
0406     if (EXSUCCEED!=tms_open_logfile(tmp, "a"))
0407     {
0408         NDRX_LOG(log_error, "Failed to create transaction log file");
0409         userlog("Failed to create transaction log file");
0410         EXFAIL_OUT(ret);
0411     }
0412     
0413     /* log the opening infos */
0414     if (EXSUCCEED!=tms_log_info(tmp))
0415     {
0416         NDRX_LOG(log_error, "Failed to log tran info");
0417         userlog("Failed to log tran info");
0418         EXFAIL_OUT(ret);
0419     }
0420     
0421     /* Only for static... */
0422     if (!(tmflags & TMFLAGS_DYNAMIC_REG))
0423     {
0424         /* assign btid? 
0425         tmp->rmstatus[xai->tmrmid-1].rmstatus = XA_RM_STATUS_ACTIVE;
0426         */
0427         char start_stat = XA_RM_STATUS_ACTIVE;
0428         
0429         *btid = tms_btid_gettid(tmp, xai->tmrmid);
0430 
0431         /* Just get the TID, status will be changed with file update  */
0432         if (EXSUCCEED!=tms_btid_add(tmp, xai->tmrmid, *btid, XA_RM_STATUS_NULL, 
0433                 0, 0, &bt))
0434         {
0435             NDRX_LOG(log_error, "Failed to add BTID: %ld", *btid);
0436             EXFAIL_OUT(ret);
0437         }
0438         
0439         if (tmflags & TMFLAGS_TPNOSTARTXID)
0440         {
0441             NDRX_LOG(log_info, "TPNOSTARTXID => starting as %c - prepared", 
0442                     XA_RM_STATUS_PREP);
0443             start_stat = XA_RM_STATUS_UNKOWN;
0444         }
0445         
0446         /* log to transaction file -> Write off RM status */
0447         if (EXSUCCEED!=tms_log_rmstatus(tmp, bt, start_stat, 0, 0))
0448         {
0449             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0450             EXFAIL_OUT(ret);
0451         }
0452     }
0453     
0454 out:
0455     
0456     /* unlock */
0457     if (EXSUCCEED!=ret)
0458     {
0459         tms_remove_logfile(tmp, hash_added);
0460     }
0461     else
0462     {
0463         if (NULL!=tmp)
0464         {
0465             tms_unlock_entry(tmp);
0466         }
0467     }
0468     return ret;
0469 }
0470 
0471 /**
0472  * Add RM to transaction.
0473  * If tid is known, then check and/or add.
0474  * If add, then ensure that we step up the BTID counter, if the number is
0475  * higher than counter have.
0476  * @param xai
0477  * @param rmid - 1 based rmid
0478  * @param[in,out] btid branch tid (if it is known, i.e. 0), if not known, then -1
0479  *  for unknown BTID requests, new BTID is generated
0480  * @return EXSUCCEED/EXFAIL
0481  */
0482 expublic int tms_log_addrm(atmi_xa_tx_info_t *xai, short rmid, int *p_is_already_logged,
0483         long *btid, long flags)
0484 {
0485     int ret = EXSUCCEED;
0486     atmi_xa_log_t *p_tl= NULL;
0487     atmi_xa_rm_status_btid_t *bt = NULL;
0488     
0489     /* atmi_xa_rm_status_t stat; */
0490     /*
0491     memset(&stat, 0, sizeof(stat));
0492     stat.rmstatus = XA_RM_STATUS_ACTIVE;
0493      * 
0494     */
0495     
0496     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, NULL)))
0497     {
0498         NDRX_LOG(log_error, "No transaction/lock timeout under xid_str: [%s]", 
0499                 xai->tmxid);
0500         ret=EXFAIL;
0501         goto out_nolock;
0502     }
0503     
0504     if (1 > rmid || rmid>NDRX_MAX_RMS)
0505     {
0506         NDRX_LOG(log_error, "RMID %hd out of bounds!!!");
0507         EXFAIL_OUT(ret);
0508     }
0509     
0510     /* if processing in background (say time-out rollback, the commit shall not be
0511      * accepted)
0512      */
0513     if (p_tl->is_background || XA_TX_STAGE_ACTIVE!=p_tl->txstage)
0514     {
0515         NDRX_LOG(log_error, "cannot register xid [%s] is_background: (%d) stage: (%hd)", 
0516                 xai->tmxid, p_tl->is_background, p_tl->txstage);
0517         EXFAIL_OUT(ret);
0518     }
0519     
0520     /*
0521     if (p_tl->rmstatus[rmid-1].rmstatus && NULL!=p_is_already_logged)
0522     {
0523         *p_is_already_logged = EXTRUE;
0524     }
0525      * */
0526     /*
0527     p_tl->rmstatus[rmid-1] = stat;
0528     */
0529     
0530     ret = tms_btid_addupd(p_tl, rmid, btid, XA_RM_STATUS_NULL, 
0531             0, 0, p_is_already_logged, &bt);
0532     
0533     /* log to transaction file */
0534     if (!(*p_is_already_logged))
0535     {
0536         char start_stat = XA_RM_STATUS_ACTIVE;
0537         
0538         NDRX_LOG(log_info, "RMID %hd/%ld added to xid_str: [%s]", 
0539             rmid, *btid, xai->tmxid);
0540         
0541         if (flags & TMFLAGS_TPNOSTARTXID)
0542         {
0543             NDRX_LOG(log_info, "TPNOSTARTXID => adding as %c - unknown", 
0544                     XA_RM_STATUS_UNKOWN);
0545             start_stat = XA_RM_STATUS_UNKOWN;
0546         }
0547         
0548         if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, start_stat, 0, 0))
0549         {
0550             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0551             EXFAIL_OUT(ret);
0552         }
0553     }
0554     else
0555     {
0556         NDRX_LOG(log_info, "RMID %hd/%ld already joined to xid_str: [%s]", 
0557             rmid, *btid, xai->tmxid);
0558     }
0559    
0560 out:
0561     /* unlock transaction from thread */
0562     tms_unlock_entry(p_tl);
0563 
0564 out_nolock:
0565     
0566     return ret;
0567 }
0568 
0569 /**
0570  * Change RM status
0571  * @param xai
0572  * @param rmid - 1 based rmid
0573  * @param[in] btid branch tid 
0574  * @param[in] rmstatus RM status to set
0575  * @return EXSUCCEED/EXFAIL
0576  */
0577 expublic int tms_log_chrmstat(atmi_xa_tx_info_t *xai, short rmid, 
0578         long btid, char rmstatus, UBFH *p_ub)
0579 {
0580     int ret = EXSUCCEED;
0581     atmi_xa_log_t *p_tl= NULL;
0582     atmi_xa_rm_status_btid_t *bt = NULL;
0583     int locke;
0584     
0585     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c]",
0586             xai->tmxid, btid, rmstatus);
0587     
0588     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, &locke)))
0589     {
0590         if (locke)
0591         {
0592             NDRX_LOG(log_error, "Lock acquire timed out for xid_str: [%s] - TPETIME", 
0593                     xai->tmxid);
0594             atmi_xa_set_error_fmt(p_ub, TPETIME, NDRX_XA_ERSN_LOCK, 
0595                         "Failed to acquire locked!");
0596         }
0597         else
0598         {
0599             NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0600                     xai->tmxid);
0601 
0602             atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_NOTX, 
0603                         "Failed to get transaction or locked for processing!");
0604         }
0605         
0606         ret=EXFAIL;
0607         goto out_nolock;
0608     }
0609     
0610     bt = tms_btid_find(p_tl, rmid, btid);
0611     
0612     if (rmstatus==bt->rmstatus)
0613     {
0614         NDRX_LOG(log_warn, "xid: [%s] BTID %ld already in status [%c]",
0615             xai->tmxid, btid, rmstatus);
0616     }
0617     
0618     if (XA_RM_STATUS_UNKOWN!=bt->rmstatus)
0619     {
0620         NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0621                 xai->tmxid);
0622         
0623         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_INVPARAM, 
0624                     "BTID %ld in status %c but want to set to: %c!",
0625                 btid, bt->rmstatus, rmstatus);
0626         
0627         EXFAIL_OUT(ret);
0628     }
0629     
0630     /* Change status to expected one.. */
0631     
0632     if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, rmstatus, 0, 0))
0633     {
0634         NDRX_LOG(log_error, "Failed to write RM status to file: %ld, "
0635                 "new stat: %c old stat: [%c]", 
0636                 btid, rmstatus, bt->rmstatus);
0637         
0638         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_RMLOGFAIL, 
0639                     "BTID %ld in status %c but want to set to: %c!",
0640                 btid, bt->rmstatus, rmstatus);
0641         
0642         EXFAIL_OUT(ret);
0643     }
0644     
0645     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c] OK",
0646             xai->tmxid, btid, rmstatus);
0647    
0648 out:
0649     /* unlock transaction from thread */
0650     tms_unlock_entry(p_tl);
0651 
0652 out_nolock:
0653     
0654     return ret;
0655 }
0656 
0657 /**
0658  * Generate logfile path + name for transaction
0659  * @param fname output buffer
0660  * @param fnamesz output buffer size
0661  * @param tmxid transaction id (str)
0662  */
0663 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid)
0664 {
0665         snprintf(fname, fnamesz, "%s/TRN-%ld-%hd-%d-%s",
0666             G_tmsrv_cfg.tlog_dir, G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid,
0667             G_server_conf.srv_id, tmxid);
0668 }
0669 
0670 /**
0671  * Get the log file name for particular transaction
0672  * @param p_tl
0673  * @return 
0674  */
0675 exprivate void tms_get_file_name(atmi_xa_log_t *p_tl)
0676 {
0677     tms_gen_file_name(p_tl->fname, sizeof(p_tl->fname), p_tl->tmxid);
0678 }
0679 
0680 /**
0681  * Get the log file name for particular transaction
0682  * @param p_tl
0683  * @return 
0684  */
0685 expublic int tms_open_logfile(atmi_xa_log_t *p_tl, char *mode)
0686 {
0687     int ret = EXSUCCEED;
0688     
0689     if (EXEOS==p_tl->fname[0])
0690     {
0691         tms_get_file_name(p_tl);
0692     }
0693     
0694     if (NULL!=p_tl->f)
0695     {
0696         /*
0697         NDRX_LOG(log_warn, "Log file [%s] already open", p_tl->fname);
0698          */
0699         goto out;
0700     }
0701     
0702     /* Try to open the file */
0703     if (ndrx_G_systest_lockloss || NULL==(p_tl->f=NDRX_FOPEN(p_tl->fname, mode)))
0704     {
0705         userlog("Failed to open XA transaction log file: [%s]: %s", 
0706                 p_tl->fname, strerror(errno));
0707         NDRX_LOG(log_error, "Failed to open XA transaction "
0708                 "log file: [%s]: %s", 
0709                 p_tl->fname, strerror(errno));
0710         ret=EXFAIL;
0711         goto out;
0712     }
0713 
0714     ndrx_fadvise_donotneed(fileno(p_tl->f), 0);
0715     
0716     NDRX_LOG(log_debug, "XA tx log file [%s] open for [%s]", 
0717             p_tl->fname, mode);
0718 
0719 out:    
0720     return ret;
0721 }
0722 
0723 /**
0724  * Do housekeeping on log file
0725  * @param logfile - path to log file
0726  * @return EXTRUE (did remove) / EXFALSE (did not remove)
0727  */
0728 expublic int tms_housekeep(char *logfile)
0729 {
0730     long diff;
0731     int err;
0732     int ret = EXFALSE;
0733 
0734     /* remove old corrupted logs... */
0735     if ((diff=ndrx_file_age(logfile)) > G_tmsrv_cfg.housekeeptime)
0736     {
0737         NDRX_LOG(log_error, "Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
0738                 logfile, diff, G_tmsrv_cfg.housekeeptime);
0739         userlog("Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
0740                 logfile, diff, G_tmsrv_cfg.housekeeptime);
0741 
0742         if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(logfile))
0743         {
0744             err = errno;
0745             NDRX_LOG(log_error, "Failed to unlink [%s]: %s", logfile, strerror(err));
0746             userlog("Failed to unlink [%s]: %s", logfile, strerror(err));
0747         }
0748         else
0749         {
0750             ret=EXTRUE;
0751         }
0752     }
0753 
0754     return ret;
0755 }
0756 
0757 /**
0758  * Read the log file from disk
0759  * @param logfile - path to log file
0760  * @param tmxid - transaction ID string
0761  * @param pp_tl - transaction log (double ptr). Returned if all ok.
0762  * @param log_removed did we remove log file (housekeep) executed?
0763  * @param housekeepable is log file housekeepable?
0764  * @return SUCCEED/FAIL
0765  */
0766 expublic int tms_load_logfile(char *logfile, char *tmxid, atmi_xa_log_t **pp_tl,
0767     int *log_removed, int *housekeepable)
0768 {
0769     int ret = EXSUCCEED;
0770     int len;
0771     char buf[1024]; /* Should be enough... */
0772     char *p;
0773     int line = 1;
0774     int got_term_last=EXFALSE, infos_ok=EXFALSE, missing_term_in_file=EXFALSE;
0775     int do_housekeep = EXFALSE;
0776     int wrote, err;
0777     long diff;
0778     *pp_tl = NULL;
0779 
0780     *log_removed = EXFALSE;
0781     *housekeepable = EXFALSE;
0782 
0783     if (NULL==(*pp_tl = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0784     {
0785         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0786         EXFAIL_OUT(ret);
0787     }
0788     /* set file name */
0789     if (strlen(logfile)>PATH_MAX)
0790     {
0791         NDRX_LOG(log_error, "Invalid file name!");
0792         EXFAIL_OUT(ret);
0793     }
0794     
0795     NDRX_STRCPY_SAFE((*pp_tl)->fname, logfile);
0796     
0797     len = strlen(tmxid);
0798     if (len>sizeof((*pp_tl)->tmxid)-1)
0799     {
0800         NDRX_LOG(log_error, "tmxid too long [%s]!", tmxid);
0801         EXFAIL_OUT(ret);
0802     }
0803     
0804     NDRX_STRCPY_SAFE((*pp_tl)->tmxid, tmxid);
0805     /* we start with active... */
0806     (*pp_tl)->txstage = XA_TX_STAGE_ACTIVE;
0807 
0808     /* set lock sequence number, if in singleton group */
0809     if (EXSUCCEED!=tms_log_setseq(*pp_tl))
0810     {
0811         EXFAIL_OUT(ret);
0812     }
0813 
0814     /* Open the file */
0815     if (EXSUCCEED!=tms_open_logfile(*pp_tl, "a+"))
0816     {
0817         NDRX_LOG(log_error, "Failed to open transaction file!");
0818         EXFAIL_OUT(ret);
0819     }
0820 
0821     /* On mac and freebsd seem that reading happens at end of the
0822      * file if file was opened with a+
0823      */
0824     if (EXSUCCEED!=fseek((*pp_tl)->f, 0, SEEK_SET))
0825     {
0826         NDRX_LOG(log_error, "Failed to fseek: %s", strerror(errno));
0827         EXFAIL_OUT(ret);
0828     }
0829 
0830     /* Read line by line & call parsing functions 
0831      * we should also parse start of the transaction (date/time)
0832      */
0833     while (fgets(buf, sizeof(buf), (*pp_tl)->f))
0834     {
0835         got_term_last = EXFALSE;
0836         len = strlen(buf);
0837         
0838         if (1 < len && '\n'==buf[len-1])
0839         {
0840             buf[len-1]=EXEOS;
0841             len--;
0842             got_term_last = EXTRUE;
0843         }
0844 
0845         /* if somebody operated with Windows... */
0846         if (1 < len && '\r'==buf[len-1])
0847         {
0848             buf[len-1]=EXEOS;
0849             len--;
0850         }
0851         
0852         NDRX_LOG(log_debug, "Parsing line: [%s]", buf);
0853         
0854         if (!got_term_last)
0855         {
0856             /* Corrupted file, terminator expected 
0857              * - mark record as corrupted.
0858              * - Also we will try to repair the situation, if last line was corrupted
0859              * then we mark with with NAK.
0860              */
0861             missing_term_in_file = EXTRUE;
0862             continue;
0863         }
0864         
0865         /* If there was power off, then we might get corrupted lines...
0866          */
0867         p = strchr(buf, ':');
0868         
0869         if (NULL==p)
0870         {
0871             NDRX_LOG(log_error, "Symbol ':' not found on line [%d]!", 
0872                     line);
0873             EXFAIL_OUT(ret);
0874         }
0875         p++;
0876         
0877         /* Classify & parse record */
0878         NDRX_LOG(log_error, "Record: %c log_version: %d", *p, (*pp_tl)->log_version);
0879         
0880         /* If this is initial line && record J
0881          * or log_version > 0, perform checksuming.
0882          */
0883         if ((!infos_ok && LOG_COMMAND_J==*p) || ((*pp_tl)->log_version > LOG_VERSION_1))
0884         {
0885             char *rs = strchr(buf, LOG_RS_SEP);
0886             unsigned long crc32_calc=0, crc32_got=0;
0887             
0888             if (NULL==rs)
0889             {
0890                 NDRX_LOG(log_error, "TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0891                             logfile, LOG_RS_SEP, buf);
0892                 userlog("TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0893                             logfile, LOG_RS_SEP, buf);
0894                 continue;
0895             }
0896             
0897             *rs = EXEOS;
0898             rs++;
0899             
0900             /* Now we get crc32 */
0901             crc32_calc = ndrx_Crc32_ComputeBuf(0, buf, strlen(buf));
0902             sscanf(rs, "%lx", &crc32_got);
0903             
0904             /* verify the log... */
0905             if (crc32_calc!=crc32_got)
0906             {
0907                 NDRX_LOG(log_error, "TMSRV log file [%s] invalid log [%s] line "
0908                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0909                         logfile, buf, crc32_calc, crc32_got);
0910                 userlog("TMSRV log file [%s] invalid log [%s] line "
0911                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0912                          logfile, buf, crc32_calc, crc32_got);
0913                 continue;
0914             }
0915         }
0916         
0917         switch (*p)
0918         {
0919             /* Version 0: */
0920             case LOG_COMMAND_I:
0921             /* Version 1: */
0922             case LOG_COMMAND_J:
0923                 
0924                 if (infos_ok)
0925                 {
0926                     NDRX_LOG(log_error, "TMSRV log file [%s] duplicate info block", 
0927                             logfile);
0928                     userlog("TMSRV log file [%s] duplicate info block", 
0929                             logfile);
0930                     do_housekeep=EXTRUE;
0931                     EXFAIL_OUT(ret);
0932                 }
0933                 
0934                 /* set the version number */
0935                 (*pp_tl)->log_version = *p - LOG_COMMAND_I + 1;
0936                 
0937                 if (EXSUCCEED!=tms_parse_info(buf, *pp_tl))
0938                 {
0939                     EXFAIL_OUT(ret);
0940                 }
0941                 
0942                 infos_ok = EXTRUE;
0943                 break;
0944             case LOG_COMMAND_STAGE: 
0945                 
0946                 if (!infos_ok)
0947                 {
0948                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0949                             logfile);
0950                     userlog("TMSRV log file [%s] Info record required first", 
0951                             logfile);
0952                     do_housekeep=EXTRUE;
0953                     EXFAIL_OUT(ret);
0954                 }
0955                 
0956                 if (EXSUCCEED!=tms_parse_stage(buf, *pp_tl))
0957                 {
0958                     EXFAIL_OUT(ret);
0959                 }
0960                 
0961                 break;
0962             case LOG_COMMAND_RMSTAT:
0963                 
0964                 if (!infos_ok)
0965                 {
0966                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0967                             logfile);
0968                     userlog("TMSRV log file [%s] Info record required first", 
0969                             logfile);
0970                     do_housekeep=EXTRUE;
0971                     EXFAIL_OUT(ret);
0972                 }
0973                 
0974                 if (EXSUCCEED!=tms_parse_rmstatus(buf, *pp_tl))
0975                 {
0976                     EXFAIL_OUT(ret);
0977                 }
0978                 
0979                 break;
0980             default:
0981                 NDRX_LOG(log_warn, "Unknown record %c on line %d", *p, line);
0982                 break;
0983         }
0984         
0985         line++;
0986     }
0987     
0988     /* check was last read OK */
0989     if (!feof((*pp_tl)->f))
0990     {
0991         err = ferror((*pp_tl)->f);
0992         
0993         NDRX_LOG(log_error, "TMSRV log file [%s] failed to read: %s", 
0994                     logfile, strerror(err));
0995         userlog("TMSRV log file [%s] failed to read: %s", 
0996                     logfile, strerror(err));
0997         EXFAIL_OUT(ret);
0998     }
0999     
1000     if (!infos_ok)
1001     {
1002         NDRX_LOG(log_error, "TMSRV log file [%s] no [%c] info record - not loading", 
1003                     logfile, LOG_COMMAND_J);
1004         userlog("TMSRV log file [%s] no [%c] info record - not loading", 
1005                     logfile, LOG_COMMAND_J);
1006         do_housekeep=EXTRUE;
1007         EXFAIL_OUT(ret);
1008     }
1009     
1010     /* so that next time we can parse OK */
1011     if (missing_term_in_file)
1012     {
1013         /* so last line bad, lets fix... */
1014         if (got_term_last)
1015         {
1016             NDRX_LOG(log_error, "TMSRV log file [%s] - invalid read, "
1017                     "previous lines seen without \\n", 
1018                     logfile);
1019             userlog("TMSRV log file [%s] - invalid read, "
1020                     "previous lines without \\n", 
1021                     logfile);
1022             EXFAIL_OUT(ret);
1023         }
1024         else
1025         {
1026             /* append with \n */
1027             NDRX_LOG(log_error, "Terminating last line (with out checksum)");
1028 
1029         if (ndrx_G_systest_lockloss)
1030         {
1031                 wrote=EXFAIL;
1032         }
1033         else
1034         {
1035                 wrote=fprintf((*pp_tl)->f, "\n");
1036         }
1037 
1038             if (wrote!=1)
1039             {
1040                 err = ferror((*pp_tl)->f);
1041                 NDRX_LOG(log_error, "TMSRV log file [%s] failed to terminate line: %s", 
1042                         logfile, strerror(err));
1043                 userlog("TMSRV log file [%s] failed to terminate line: %s", 
1044                         logfile, strerror(err));
1045                 EXFAIL_OUT(ret);
1046             }
1047         }
1048     }
1049     
1050     /* thus it will start to drive it by background thread
1051      * Any transaction will be completed in background.
1052      */
1053     (*pp_tl)->is_background = EXTRUE;
1054     
1055     /* 
1056      * If we keep in active state, then timeout will kill the transaction (or it will be finished by in progress 
1057      *  binary, as maybe transaction is not yet completed). Thought we might loss the infos
1058      *  of some registered process, but then it would be aborted transaction anyway.
1059      *  but then record might be idling in the resource.
1060      * 
1061      * If we had stage logged, then transaction would be completed accordingly,
1062      * and would complete with those resources for which states is not yet
1063      * finalized (according to logs)
1064      */
1065     
1066     /* Add transaction to the hash. We need locking here. 
1067      * 
1068      * If transaction was in prepare stage XA_TX_STAGE_PREPARING (40)
1069      * We need to abort it because, there is no chance that caller will get
1070      * response back.
1071      */
1072     if (XA_TX_STAGE_PREPARING == (*pp_tl)->txstage 
1073             || XA_TX_STAGE_ACTIVE == (*pp_tl)->txstage)
1074     {
1075         NDRX_LOG(log_error, "XA Transaction [%s] was in active or preparing stage and "
1076                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1077         
1078         userlog("XA Transaction [%s] was in  active or preparing stage and "
1079                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1080         
1081         /* change the status (+ log) */
1082         (*pp_tl)->lockthreadid = ndrx_gettid();
1083         tms_log_stage(*pp_tl, XA_TX_STAGE_ABORTING, EXTRUE);
1084         (*pp_tl)->lockthreadid = 0;
1085     }
1086     
1087     MUTEX_LOCK_V(M_tx_hash_lock);
1088     EXHASH_ADD_STR( M_tx_hash, tmxid, (*pp_tl));
1089     MUTEX_UNLOCK_V(M_tx_hash_lock);
1090     
1091     NDRX_LOG(log_debug, "TX [%s] loaded OK", tmxid);
1092 out:
1093 
1094     /* Clean up if error. */
1095     if (EXSUCCEED!=ret)
1096     {
1097         userlog("Failed to load/recover transaction [%s]", logfile);
1098         if (NULL!=*pp_tl)
1099         {
1100             /* remove any stuff added... */
1101             
1102             if (tms_is_logfile_open(*pp_tl))
1103             {
1104                 tms_close_logfile(*pp_tl);
1105             }
1106 
1107             tms_remove_logfree(*pp_tl, EXFALSE);
1108             *pp_tl = NULL;
1109         }
1110     }
1111 
1112     /* clean up corrupted files */
1113     if (do_housekeep)
1114     {
1115         *housekeepable=EXTRUE;
1116 
1117         /* so that hashmaps can be cleaned up... */
1118         if (tms_housekeep(logfile))
1119         {
1120             *log_removed=EXTRUE;
1121         }
1122     }
1123 
1124     return ret;
1125 }
1126 
1127 /**
1128  * Test to see is log file open
1129  * @param p_tl
1130  * @return TRUE/FALSE
1131  */
1132 expublic int tms_is_logfile_open(atmi_xa_log_t *p_tl)
1133 {
1134     if (p_tl->f)
1135         return EXTRUE;
1136     else
1137         return EXFALSE;
1138 }
1139 
1140 /**
1141  * Close the log file
1142  * @param p_tl
1143  */
1144 expublic void tms_close_logfile(atmi_xa_log_t *p_tl)
1145 {
1146     if (NULL!=p_tl->f)
1147     {
1148         NDRX_FCLOSE(p_tl->f);
1149         p_tl->f = NULL;
1150     }
1151 }
1152 
1153 /**
1154  * Free up log file (just memory)
1155  * @param p_tl log handle
1156  * @param hash_rm remove log entry from tran hash
1157  */
1158 expublic void tms_remove_logfree(atmi_xa_log_t *p_tl, int hash_rm)
1159 {
1160     int i;
1161     atmi_xa_rm_status_btid_t *el, *elt;
1162     
1163     if (hash_rm)
1164     {
1165         MUTEX_LOCK_V(M_tx_hash_lock);
1166         EXHASH_DEL(M_tx_hash, p_tl); 
1167         MUTEX_UNLOCK_V(M_tx_hash_lock);
1168     }
1169 
1170     /* Remove branch TIDs */
1171     
1172     for (i=0; i<NDRX_MAX_RMS; i++)
1173     {
1174         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1175         {
1176             EXHASH_DEL(p_tl->rmstatus[i].btid_hash, el);
1177             NDRX_FREE(el);
1178         }
1179     }
1180     
1181     NDRX_FREE(p_tl);
1182 }
1183 
1184 /**
1185  * Removes also transaction from hash. Either fully
1186  * committed or fully aborted...
1187  * @param p_tl - becomes invalid after removal!
1188  * @param hash_rm remove from hash lists
1189  */
1190 expublic void tms_remove_logfile(atmi_xa_log_t *p_tl, int hash_rm)
1191 {
1192     int have_file = EXFALSE;
1193     
1194     if (tms_is_logfile_open(p_tl))
1195     {
1196         have_file = EXTRUE;
1197         tms_close_logfile(p_tl);
1198     }/* Check for file existance, if was not open */
1199     else if (0 == access(p_tl->fname, 0))
1200     {
1201         have_file = EXTRUE;
1202     }
1203         
1204     if (have_file)
1205     {
1206         if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(p_tl->fname))
1207         {
1208             int err = errno;
1209             NDRX_LOG(log_debug, "Failed to remove tx log file [%s]: %d (%s)", 
1210                     p_tl->fname, err, strerror(err));
1211             userlog("Failed to remove tx log file [%s]: %d (%s)", 
1212                     p_tl->fname, err, strerror(err));
1213         }
1214     }
1215     
1216     /* Remove the log for hash! */
1217     tms_remove_logfree(p_tl, hash_rm);
1218 }
1219 
1220 /**
1221  * We should have universal log writter
1222  */
1223 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...)
1224 {
1225     int ret = EXSUCCEED;
1226     char msg[LOG_MAX+1] = {EXEOS};
1227     char msg2[LOG_MAX+1] = {EXEOS};
1228     int len, wrote, exp;
1229     int make_error = EXFALSE;
1230     unsigned long crc32;
1231     va_list ap;
1232     
1233     CHK_THREAD_ACCESS;
1234     
1235     /* If log not open - just skip... */
1236     if (NULL==p_tl->f)
1237     {
1238         return EXSUCCEED;
1239     }
1240      
1241     va_start(ap, fmt);
1242     (void) vsnprintf(msg, sizeof(msg), fmt, ap);
1243     va_end(ap);
1244     
1245     snprintf(msg2, sizeof(msg2), "%lld:%c:%s", ndrx_utc_tstamp(), command, msg);
1246     len = strlen(msg2);
1247     
1248     /* check exactly how much bytes was written */
1249     
1250     if (1==G_atmi_env.test_tmsrv_write_fail)
1251     {
1252         make_error = EXTRUE;
1253     }
1254     else if (2==G_atmi_env.test_tmsrv_write_fail)
1255     {
1256         /* generate 25% random errors */
1257         if (ndrx_rand() % 4 == 0)
1258         {
1259             make_error = EXTRUE;
1260         }
1261         
1262     }
1263     
1264     crc32 = ndrx_Crc32_ComputeBuf(0, msg2, len);
1265     
1266     /* Simulate that only partial message is written in case
1267      * if disk is full or crash happens
1268      */
1269     wrote=0;
1270     
1271     /* if write file test case - write partially, also omit EOL terminator 
1272      * Also... needs flag for commit crash simulation... i.e. 
1273      * So that if commit crash is set, we partially write the command (stage change)
1274      * and afterwards tmsrv exits.
1275      * And then when flag is removed, transaction shall commit OK
1276      */
1277     if (p_tl->log_version == LOG_VERSION_1)
1278     {
1279         NDRX_LOG(log_debug, "Log format: v%d", p_tl->log_version);
1280         
1281         exp = len+1;
1282         if (make_error)
1283         {
1284             exp++;
1285         }
1286 
1287     if (ndrx_G_systest_lockloss)
1288     {
1289             wrote=EXFAIL;
1290     }
1291     else
1292     {
1293             wrote=fprintf(p_tl->f, "%s\n", msg2);
1294     }
1295 
1296     }
1297     else
1298     {
1299         /* version 2+ */
1300         exp = len+8+1+1;
1301         
1302         if (make_error)
1303         {
1304             crc32+=1;
1305             exp++;
1306         }
1307         
1308     if (ndrx_G_systest_lockloss)
1309     {
1310             wrote=EXFAIL;
1311     }
1312     else
1313     {
1314             wrote=fprintf(p_tl->f, "%s%c%08lx\n", msg2, LOG_RS_SEP, crc32);
1315     }
1316     }
1317     
1318     if (wrote != exp)
1319     {
1320         int err = errno;
1321         
1322         /* For Q/A purposes - simulate no space error, if requested */
1323         if (make_error)
1324         {
1325             NDRX_LOG(log_error, "QA point: make_error TRUE");
1326             err = ENOSPC;
1327         }
1328         
1329         NDRX_LOG(log_error, "ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1330                 exp, wrote, strerror(err));
1331         userlog("ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1332                 exp, wrote, strerror(err));
1333         
1334         ret=EXFAIL;
1335         goto out;
1336     }
1337     
1338     
1339 out:
1340     /* flush what ever we have */
1341     /* in case of ndrx_G_systest_lockloss -> IO fence used */
1342     if (ndrx_G_systest_lockloss || EXSUCCEED!=fflush(p_tl->f))
1343     {
1344         int err=errno;
1345         userlog("ERROR! Failed to fflush(): %s", strerror(err));
1346         NDRX_LOG(log_error, "ERROR! Failed to fflush(): %s", strerror(err));
1347     ret=EXFAIL;
1348     }
1349     /*fsync(fileno(p_tl->f));*/
1350     return ret;
1351 }
1352 
1353 /**
1354  * Write general info about transaction
1355  * FORMAT: <RM_ID>:<NODE_ID>:<SRV_ID>
1356  * @param p_tl
1357  * @param stage
1358  * @return 
1359  */
1360 expublic int tms_log_info(atmi_xa_log_t *p_tl)
1361 {
1362     int ret = EXSUCCEED;
1363     char rmsbuf[NDRX_MAX_RMS*3+1]={EXEOS};
1364     int i;
1365     int len;
1366     
1367     CHK_THREAD_ACCESS;
1368     
1369     /* log the RMs participating in transaction 
1370     for (i=0; i<NDRX_MAX_RMS; i++)
1371     {
1372         if (p_tl->rmstatus[i].rmstatus)
1373         {
1374             len = strlen(rmsbuf);
1375             if (len>0)
1376             {
1377                 rmsbuf[len]=',';
1378                 rmsbuf[len+1]=',';
1379                 len++;
1380             }
1381             sprintf(rmsbuf+len, "%d", i+1);
1382         }
1383     }
1384      *
1385      */
1386     
1387     if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_J, "%hd:%hd:%hd:%ld:%s", 
1388             p_tl->tmrmid, p_tl->tmnodeid, p_tl->tmsrvid, p_tl->txtout, rmsbuf))
1389     {
1390         ret=EXFAIL;
1391         goto out;
1392     }
1393     
1394 out:
1395     return ret;
1396 }
1397 /* Tokenizer variables */
1398 #define TOKEN_READ_VARS \
1399             int first = EXTRUE;\
1400             char *p;
1401 
1402 /* Read token for parsing */
1403 #define TOKEN_READ(X, Y)\
1404     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1405     {\
1406         NDRX_LOG(log_error, "Missing token: %s.%s!", X, Y);\
1407         EXFAIL_OUT(ret);\
1408     }\
1409     if (first)\
1410         first = EXFALSE;\
1411         
1412 #define TOKEN_READ_OPT(X, Y)\
1413     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1414     {\
1415         NDRX_LOG(log_warn, "Missing token: %s.%s - optional, ignore!", X, Y);\
1416     }\
1417     if (first)\
1418         first = EXFALSE;\
1419 
1420 /**
1421  * Parse & rebuild the info from file record
1422  * @param buf - record from transaction file
1423  * @param p_tl - transaction record
1424  * @return SUCCEED/FAIL
1425  */
1426 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl)
1427 {
1428     int ret = EXSUCCEED;
1429     TOKEN_READ_VARS;
1430     char *p2;
1431     char *saveptr1 = NULL;
1432     int rmid;
1433     long long tdiff;
1434     
1435     /* Read first time stamp */
1436     TOKEN_READ("info", "tstamp");
1437     p_tl->t_start = strtoull (p, NULL, 10);
1438     
1439     tdiff = ndrx_utc_tstamp() - p_tl->t_start;
1440     /* adjust the timer...  */
1441     ndrx_stopwatch_reset(&p_tl->ttimer);
1442     ndrx_stopwatch_minus(&p_tl->ttimer, tdiff);
1443     NDRX_LOG(log_debug, "Transaction age: %ld ms (t_start: %llu tdiff: %lld)",
1444         ndrx_stopwatch_get_delta(&p_tl->ttimer), p_tl->t_start, tdiff);
1445              
1446     /* read tmrmid, tmnodeid, tmsrvid = ignore, but they must be in place! */
1447     TOKEN_READ("info", "cmdid");
1448     /* Basically we ignore these values, and read from current process!!!: */
1449     TOKEN_READ("info", "tmrmid");
1450     TOKEN_READ("info", "tmnodeid");
1451     TOKEN_READ("info", "tmsrvid");
1452     p_tl->tmrmid = G_atmi_env.xa_rmid;
1453     p_tl->tmnodeid = G_tmsrv_cfg.vnodeid;
1454     p_tl->tmsrvid = G_server_conf.srv_id;
1455     
1456     TOKEN_READ("info", "txtout");
1457     p_tl->txtout = atol(p);
1458     
1459     /* Read involved resrouce managers - put them in join state */
1460 #if 0
1461     TOKEN_READ("info", "rmsbuf");
1462     
1463     - not used anymore. All active branches are logged as RM status
1464     p2 = strtok_r (p, ",", &saveptr1);
1465     while (p2 != NULL)
1466     {
1467         /* Put the p2 in join state */
1468         rmid = atoi(p2);
1469         if (rmid<1 || rmid>NDRX_MAX_RMS)
1470         {
1471             NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1472             EXFAIL_OUT(ret);
1473         }
1474         
1475         p_tl->rmstatus[rmid-1].rmstatus =XA_RM_STATUS_ACTIVE;
1476         
1477         p2 = strtok_r (NULL, ",", &saveptr1);
1478     }
1479 #endif
1480     
1481 out:                
1482     return ret;
1483 }
1484 
1485 #define CRASH_CLASS_EXIT            0 /**< exit at crash                  */
1486 #define CRASH_CLASS_NO_WRITE        1 /**< Do not write and report error  */
1487 
1488 /**
1489  * Change tx state + write transaction stage
1490  * FORMAT: <STAGE_CODE>
1491  * @param p_tl
1492  * @param forced is decision forced? I.e. no restore on error.
1493  * @return EXSUCCEED/EXFAIL
1494  */
1495 expublic int tms_log_stage(atmi_xa_log_t *p_tl, short stage, int forced)
1496 {
1497     int ret = EXSUCCEED;
1498     short stage_org=EXFAIL;
1499     /* <Crash testing> */
1500     int make_crash = EXFALSE; /**< Crash simulation */
1501     int crash_stage, crash_class;
1502     /* </Crash testing> */
1503     
1504     CHK_THREAD_ACCESS;
1505     
1506     if (p_tl->txstage!=stage)
1507     {
1508         stage_org = p_tl->txstage;
1509         p_tl->txstage = stage;
1510 
1511         NDRX_LOG(log_debug, "tms_log_stage: new stage - %hd (cc %d)", 
1512                 p_tl->txstage, G_atmi_env.test_tmsrv_commit_crash);
1513         
1514         
1515         /* <Crash testing> */
1516         
1517         /* QA: commit crash test point... 
1518          * Once crash flag is disabled, commit shall be finished by background
1519          * process.
1520          */
1521         crash_stage = G_atmi_env.test_tmsrv_commit_crash % 100;
1522         crash_class = G_atmi_env.test_tmsrv_commit_crash / 100;
1523         
1524         /* let write && exit */
1525         if (stage > 0 && crash_class==CRASH_CLASS_EXIT && stage == crash_stage)
1526         {
1527             NDRX_LOG(log_debug, "QA commit crash...");
1528             G_atmi_env.test_tmsrv_write_fail=EXTRUE;
1529             make_crash=EXTRUE;
1530         }
1531 
1532         /* no write & report error */
1533         if (stage > 0 && crash_class==CRASH_CLASS_NO_WRITE && stage == crash_stage)
1534         {
1535             NDRX_LOG(log_debug, "QA no write crash");
1536             ret=EXFAIL;
1537             goto out;
1538         }
1539         /* </Crash testing> */
1540         else
1541         {
1542             /* in case if ... there was failover and concurrent write of commit
1543              * which might overwrite the given abort position, lets write abort
1544              * states few times more, so that recovery processes would finally
1545              * see that abort has started...
1546              * Note that other tmsrv will not go further than last commit entry
1547              * as after that there is check is that tmsrv still on locked node.
1548              * ------
1549              * Also... this might not be 100% safe, as shared FS might become 
1550              * corrutped, if stale node starts to write somethink after the
1551              * other node took over. Thus hardware based fencing is mandatory
1552              * for production failover modes.
1553              */
1554             int w_cnt=1, i;
1555 
1556             if (XA_TX_STAGE_ABORTING==stage)
1557             {
1558                 w_cnt=3;
1559             }
1560             else if (XA_TX_STAGE_COMMITTING==stage)
1561             {
1562                 /* ensure that we are still locked, as we are going to write
1563                  * commit message down. If the abort was in progress,
1564                  * then our previous prepares might have overwritten  their
1565                  * abort state. Thus if we are ok here, then there will be
1566                  * 1x commit at given position, but if after then or before
1567                  * the actual disk write take over, they will put 3x aborts
1568                  * and then recovery process shall pick that up
1569                  */
1570                 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1571                 {
1572                     EXFAIL_OUT(ret);
1573                 }
1574             }
1575 
1576             for (i=0; i<w_cnt; i++)
1577             {
1578                 if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_STAGE, "%hd", stage))
1579                 {
1580                     ret=EXFAIL;
1581                     goto out;
1582                 }
1583             }
1584         }
1585 
1586         /* in case if switching to committing, we must sync the log & directory */
1587         if ((XA_TX_STAGE_COMMITTING==stage) || (XA_TX_STAGE_ABORTING==stage))
1588         {
1589         if (ndrx_G_systest_lockloss)
1590         {
1591         /*IO fence test */
1592             EXFAIL_OUT(ret);
1593         }
1594         else if (EXSUCCEED!=ndrx_fsync_fsync(p_tl->f, G_atmi_env.xa_fsync_flags) ||
1595                 EXSUCCEED!=ndrx_fsync_dsync(G_tmsrv_cfg.tlog_dir, G_atmi_env.xa_fsync_flags))
1596             {
1597                 EXFAIL_OUT(ret);
1598             }
1599 
1600             /* if we are in singleton group mode, validate that we still 
1601              * own the lock
1602              */
1603             if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1604             {
1605                 EXFAIL_OUT(ret);
1606             }
1607 
1608         } /* new stage commit or abort */
1609     }
1610     
1611 out:
1612                 
1613     /* <Crash testing> */
1614     if (make_crash)
1615     {
1616         exit(1);
1617     }
1618     /* </Crash testing> */
1619 
1620     /* If failed to log the stage switch, restore original transaction
1621      * stage
1622      */
1623 
1624     if (forced)
1625     {
1626         return EXSUCCEED;
1627     }
1628     else if (EXSUCCEED!=ret && EXFAIL!=stage_org)
1629     {
1630         p_tl->txstage = stage_org;
1631     }
1632 
1633     /* if not forced, get the real result */
1634     return ret;
1635 }
1636 
1637 
1638 /**
1639  * Parse stage info from buffer read
1640  * @param buf - buffer read
1641  * @param p_tl - transaction record
1642  * @return SUCCEED/FAIL
1643  */
1644 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl)
1645 {
1646     int ret = EXSUCCEED;
1647     short stage;
1648     TOKEN_READ_VARS;
1649 
1650     TOKEN_READ("stage", "tstamp");
1651     p_tl->t_update = atol(p);
1652     TOKEN_READ("info", "cmdid");
1653 
1654     TOKEN_READ("stage", "txstage");
1655 
1656     /* In case ... if we are committing, there will be no abort.
1657     * and vice versa. that's in case if doing failover
1658     * the first state shall live on...
1659     * --- not true...
1660     * we could go to commit, but just right before logging
1661     * other node took over... starts to abort.
1662     * and first one wakes up and tries to write the commit stage...
1663     * thus we shall allow to go down from commit to abort.
1664     */
1665     stage=(short)atoi(p);
1666 
1667     if (stage < p_tl->txstage)
1668     {
1669         NDRX_LOG(log_error, "Transaction logs recover: stage for [%s] downgrade was %hd, read %hd",
1670                 p_tl->tmxid, p_tl->txstage, stage);
1671         userlog("Transaction logs recover: stage for [%s] downgrade was %hd, read %hd",
1672                 p_tl->tmxid, p_tl->txstage, stage);
1673     }
1674 
1675     if (p_tl->txstage>=XA_TX_STAGE_ABORTING &&
1676         p_tl->txstage<=XA_TX_STAGE_ABFORGOT_HEU)
1677     {
1678         if (stage>=XA_TX_STAGE_ABORTING &&
1679             stage<=XA_TX_STAGE_ABFORGOT_HEU)
1680         {
1681             p_tl->txstage = stage;
1682         }
1683         else
1684         {
1685             NDRX_LOG(log_error, "Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1686                 p_tl->tmxid, p_tl->txstage, stage);
1687             userlog("Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1688                 p_tl->tmxid, p_tl->txstage, stage);
1689         }
1690     }
1691     else
1692     {
1693         p_tl->txstage = stage;
1694     }
1695    
1696 out:
1697     return ret;
1698 }
1699 
1700 /**
1701  * Change + write RM status to file
1702  * FORMAT: <rmid>:<rmstatus>:<rmerrorcode>:<rmreason>
1703  * @param p_tl transaction
1704  * @param[in] bt transaction branch
1705  * @param[in] rmstatus Resource manager status
1706  * @param[in] rmerrorcode Resource manager error code
1707  * @param[in] rmreason reason code
1708  * @return SUCCEED/FAIL
1709  */
1710 expublic int tms_log_rmstatus(atmi_xa_log_t *p_tl, atmi_xa_rm_status_btid_t *bt, 
1711         char rmstatus, int rmerrorcode, short rmreason)
1712 {
1713     int ret = EXSUCCEED;
1714     int do_log = EXFALSE;
1715     
1716     CHK_THREAD_ACCESS;
1717 
1718     if (bt->rmstatus != rmstatus)
1719     {
1720         do_log = EXTRUE;
1721     }
1722     
1723     bt->rmstatus = rmstatus;
1724     bt->rmerrorcode = rmerrorcode;
1725     bt->rmreason = rmreason;
1726     
1727     if (do_log)
1728     {
1729         if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_RMSTAT, "%hd:%c:%d:%hd:%ld",
1730                 bt->rmid, rmstatus, rmerrorcode, rmreason, bt->btid))
1731         {
1732             ret=EXFAIL;
1733             goto out;
1734         }
1735     }
1736     
1737 out:
1738     return ret;
1739 }
1740 
1741 /**
1742  * Parse RM's status record from transaction file
1743  * @param buf - transaction record's line
1744  * @param p_tl - internal transaction record
1745  * @return SUCCEED/FAIL;
1746  */
1747 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl)
1748 {
1749     int ret = EXSUCCEED;
1750     char rmstatus;
1751     int rmerrorcode;
1752     short rmreason;
1753     int rmid;
1754     long btid;
1755     atmi_xa_rm_status_btid_t *bt = NULL;
1756     
1757     TOKEN_READ_VARS;
1758    
1759     TOKEN_READ("rmstat", "tstamp");
1760     p_tl->t_update = atol(p);
1761     TOKEN_READ("info", "cmdid");
1762     
1763     TOKEN_READ("rmstat", "rmid");
1764    
1765     rmid = atoi(p);
1766     if (rmid<1 || rmid>NDRX_MAX_RMS)
1767     {
1768         NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1769         EXFAIL_OUT(ret);
1770     }
1771    
1772     TOKEN_READ("rmstat", "rmstatus");
1773     rmstatus = *p;
1774 
1775     TOKEN_READ("rmstat", "rmerrorcode");
1776     rmerrorcode = atoi(p);
1777 
1778     TOKEN_READ("rmstat", "rmreason");
1779     rmreason = (short)atoi(p);
1780 
1781     TOKEN_READ_OPT("rmstat", "btid");
1782     
1783     if (NULL!=p)
1784     {
1785         btid = atol(p);
1786     }
1787     else
1788     {
1789         btid=0;
1790     }
1791     
1792     /*
1793     p_tl->rmstatus[rmid-1].rmstatus = rmstatus;
1794     p_tl->rmstatus[rmid-1].rmerrorcode = rmerrorcode;
1795     p_tl->rmstatus[rmid-1].rmreason = rmreason;
1796      * */
1797     
1798     ret = tms_btid_addupd(p_tl, rmid, 
1799             &btid, rmstatus, rmerrorcode, rmreason, NULL, &bt);
1800    
1801 out:
1802     return ret;    
1803 }
1804 
1805 /**
1806  * Copy the background items to the linked list.
1807  * The idea is that this is processed by background. During that time, it does not
1808  * remove any items from hash. Thus pointers should be still valid. 
1809  * TODO: We should copy here transaction info too....
1810  * 
1811  * @param p_tl
1812  * @return 
1813  */
1814 expublic atmi_xa_log_list_t* tms_copy_hash2list(int copy_mode)
1815 {
1816     atmi_xa_log_list_t *ret = NULL;
1817     atmi_xa_log_t * r, *rt;
1818     atmi_xa_log_list_t *tmp;
1819     
1820     if (copy_mode & COPY_MODE_ACQLOCK)
1821     {
1822         MUTEX_LOCK_V(M_tx_hash_lock);
1823     }
1824     
1825     /* No changes to hash list during the lock. */    
1826     
1827     EXHASH_ITER(hh, M_tx_hash, r, rt)
1828     {
1829         /* Only background items... */
1830         if (r->is_background && !(copy_mode & COPY_MODE_BACKGROUND))
1831             continue;
1832         
1833         if (!r->is_background && !(copy_mode & COPY_MODE_FOREGROUND))
1834             continue;
1835                 
1836         if (NULL==(tmp = NDRX_CALLOC(1, sizeof(atmi_xa_log_list_t))))
1837         {
1838             NDRX_LOG(log_error, "Failed to malloc %d: %s", 
1839                     sizeof(atmi_xa_log_list_t), strerror(errno));
1840             goto out;
1841         }
1842         
1843         /* we should copy full TL structure, because some other thread might
1844          * will use it.
1845          * Having some invalid pointers inside does not worry us, because
1846          * we just need a list for a printing or xids for background txn lookup
1847          */
1848         memcpy(&tmp->p_tl, r, sizeof(*r));
1849         
1850         LL_APPEND(ret, tmp);
1851     }
1852     
1853 out:
1854     if (copy_mode & COPY_MODE_ACQLOCK)
1855     {
1856         MUTEX_UNLOCK_V(M_tx_hash_lock);
1857     }
1858 
1859     return ret;
1860 }
1861 
1862 /**
1863  * Lock the transaction log hash
1864  */
1865 expublic void tms_tx_hash_lock(void)
1866 {
1867     MUTEX_LOCK_V(M_tx_hash_lock);
1868 }
1869 
1870 /**
1871  * Unlock the transaction log hash
1872  */
1873 expublic void tms_tx_hash_unlock(void)
1874 {
1875     MUTEX_UNLOCK_V(M_tx_hash_lock);
1876 }
1877 
1878 /**
1879  * Copy TLOG info to FB (for display purposes)
1880  * The function has mandatory task to fill in the general fields about
1881  * transaction.
1882  * The actual RM/Branch tids are fill up till we get UBF buffer error (full)
1883  * @param p_ub - Dest UBF
1884  * @param p_tl - Transaction log
1885  * @param incl_rm_stat include RM status in response buffer
1886  * @return EXSUCCEED/EXFAIL
1887  */
1888 expublic int tms_log_cpy_info_to_fb(UBFH *p_ub, atmi_xa_log_t *p_tl, int incl_rm_stat)
1889 {
1890     int ret = EXSUCCEED;
1891     long tspent;
1892     short i;
1893     tspent = p_tl->txtout - ndrx_stopwatch_get_delta_sec(&p_tl->ttimer);    
1894     atmi_xa_rm_status_btid_t *el, *elt;
1895     
1896     if (tspent<0)
1897     {
1898         tspent = 0;
1899     }
1900     
1901     if (
1902             EXSUCCEED!=Bchg(p_ub, TMXID, 0, (char *)p_tl->tmxid, 0L) ||
1903             EXSUCCEED!=Bchg(p_ub, TMRMID, 0, (char *)&(p_tl->tmrmid), 0L) ||
1904             EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&(p_tl->tmnodeid), 0L) ||
1905             EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&(p_tl->tmsrvid), 0L) ||
1906             EXSUCCEED!=Bchg(p_ub, TMTXTOUT, 0, (char *)&(p_tl->txtout), 0L) ||
1907             EXSUCCEED!=Bchg(p_ub, TMTXTOUT_LEFT, 0, (char *)&tspent, 0L) ||
1908             EXSUCCEED!=Bchg(p_ub, TMTXSTAGE, 0, (char *)&(p_tl->txstage), 0L) ||
1909             EXSUCCEED!=Bchg(p_ub, TMTXTRYCNT, 0, (char *)&(p_tl->trycount), 0L) ||
1910             EXSUCCEED!=Bchg(p_ub, TMTXTRYMAXCNT, 0, (char *)&(G_tmsrv_cfg.max_tries), 0L)
1911         )
1912     {
1913         NDRX_LOG(log_error, "Failed to return fields: [%s]", 
1914                 Bstrerror(Berror));
1915         EXFAIL_OUT(ret);
1916     }
1917     
1918     /* return the info about RMs: */
1919     for (i=0; incl_rm_stat && i<NDRX_MAX_RMS; i++)
1920     {
1921         /* loop over the Branch TIDs  */
1922         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1923         {
1924             /* cast to long... */
1925             long rmerrorcode =  (long)el->rmerrorcode;
1926             
1927             if (
1928                 EXSUCCEED!=Badd(p_ub, TMTXRMID, (char *)&el->rmid, 0L) ||
1929                 EXSUCCEED!=Badd(p_ub, TMTXBTID, (char *)&el->btid, 0L) ||
1930                 EXSUCCEED!=Badd(p_ub, TMTXRMSTATUS,
1931                     (char *)&(el->rmstatus), 0L) ||
1932                 EXSUCCEED!=Badd(p_ub, TMTXRMERRCODE,
1933                     (char *)&rmerrorcode, 0L) ||
1934                 EXSUCCEED!=Badd(p_ub, TMTXRMREASON,
1935                     (char *)&(el->rmreason), 0L)
1936                 )
1937             {
1938                 /* there could be tons of these branches for 
1939                  * mysql/mariadb or posgresql 
1940                  */
1941                 NDRX_LOG(log_error, "Failed to return fields: [%s] - ignore", 
1942                             Bstrerror(Berror));
1943                 
1944                 /* finish off. */
1945                 goto out;
1946             }
1947         }
1948     } /* for */
1949     
1950 out:
1951     return ret;
1952 }
1953 
1954 /* vim: set ts=4 sw=4 et smartindent: */