Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief tmsrv - transaction logging & accounting
0003  *   for systems which are not support TMJOIN, we shall create additional
0004  *   XIDs for each of the involved process session. These XIDs shall be used
0005  *   as "sub-xids". There will be Master XID involved in process and
0006  *   sub-xids will be logged in tmsrv and will be known by processes locally
0007  *   Thus p_tl->rmstatus needs to be extended with hash list of the local
0008  *   transaction ids. We shall keep the structure universal, and use
0009  *   sub-xids even TMJOIN is supported.
0010  *
0011  * @file log.c
0012  */
0013 /* -----------------------------------------------------------------------------
0014  * Enduro/X Middleware Platform for Distributed Transaction Processing
0015  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0016  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0017  * This software is released under one of the following licenses:
0018  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0019  * See LICENSE file for full text.
0020  * -----------------------------------------------------------------------------
0021  * AGPL license:
0022  *
0023  * This program is free software; you can redistribute it and/or modify it under
0024  * the terms of the GNU Affero General Public License, version 3 as published
0025  * by the Free Software Foundation;
0026  *
0027  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0028  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0029  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0030  * for more details.
0031  *
0032  * You should have received a copy of the GNU Affero General Public License along 
0033  * with this program; if not, write to the Free Software Foundation, Inc.,
0034  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0035  *
0036  * -----------------------------------------------------------------------------
0037  * A commercial use license is available from Mavimax, Ltd
0038  * contact@mavimax.com
0039  * -----------------------------------------------------------------------------
0040  */
0041 #include <stdio.h>
0042 #include <stdlib.h>
0043 #include <string.h>
0044 #include <errno.h>
0045 #include <regex.h>
0046 #include <utlist.h>
0047 #include <stdarg.h>
0048 
0049 #include <ndebug.h>
0050 #include <atmi.h>
0051 #include <atmi_int.h>
0052 #include <typed_buf.h>
0053 #include <ndrstandard.h>
0054 #include <ubf.h>
0055 #include <Exfields.h>
0056 #include <nstdutil.h>
0057 
0058 #include <exnet.h>
0059 #include <ndrxdcmn.h>
0060 
0061 #include "tmsrv.h"
0062 #include "../libatmisrv/srv_int.h"
0063 #include "userlog.h"
0064 #include <xa_cmn.h>
0065 #include <exhash.h>
0066 #include <unistd.h>
0067 #include <Exfields.h>
0068 #include <singlegrp.h>
0069 /*---------------------------Externs------------------------------------*/
0070 /*---------------------------Macros-------------------------------------*/
0071 #define LOG_MAX         1024
0072 
0073 #define LOG_COMMAND_STAGE           'S' /**< Identify stage of txn           */
0074 #define LOG_COMMAND_I               'I' /**< Info about txn                  */
0075 #define LOG_COMMAND_J               'J' /**< Version 2 format with checksums */
0076 #define LOG_COMMAND_RMSTAT          'R' /**< Log the RM status               */
0077 
0078 #define LOG_VERSION_1                1   /**< Initial version                  */
0079 #define LOG_VERSION_2                2   /**< Version 1, contains crc32 checks */
0080 
0081 #define CHK_THREAD_ACCESS if (ndrx_gettid()!=p_tl->lockthreadid)\
0082     {\
0083         NDRX_LOG(log_error, "Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0084                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0085         userlog("Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0086                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0087         return EXFAIL;\
0088     }
0089 #define LOG_RS_SEP  ';'            /**< record seperator               */
0090 /*---------------------------Enums--------------------------------------*/
0091 /*---------------------------Typedefs-----------------------------------*/
0092 /*---------------------------Globals------------------------------------*/
0093 exprivate atmi_xa_log_t *M_tx_hash = NULL; 
0094 exprivate MUTEX_LOCKDECL(M_tx_hash_lock); /* Operations with hash list            */
0095 /*---------------------------Statics------------------------------------*/
0096 /*---------------------------Prototypes---------------------------------*/
0097 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...);
0098 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl);
0099 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl);
0100 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl);
0101 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid);
0102 
0103 /**
0104  * Unlock transaction
0105  * @param p_tl
0106  * @return SUCCEED/FAIL
0107  */
0108 expublic int tms_unlock_entry(atmi_xa_log_t *p_tl)
0109 {
0110     CHK_THREAD_ACCESS;
0111     
0112     NDRX_LOG(log_info, "Transaction [%s] unlocked by thread %" PRIu64, p_tl->tmxid,
0113             p_tl->lockthreadid);
0114     
0115     MUTEX_LOCK_V(M_tx_hash_lock);
0116     p_tl->lockthreadid = 0;
0117     MUTEX_UNLOCK_V(M_tx_hash_lock);
0118     
0119     return EXSUCCEED;
0120 }
0121 
0122 /**
0123  * Check that log file actually exists on the disk.
0124  * @param tmxid transaction
0125  * @return EXTRUE/EXFALSE/EXFAIL
0126  */
0127 expublic int tms_log_exists_file(char *tmxid)
0128 {
0129     char fname[PATH_MAX+1];
0130     int err=0;
0131 
0132     tms_gen_file_name(fname, sizeof(fname), tmxid);
0133 
0134     if (EXSUCCEED == access(fname, 0))
0135     {
0136         return EXTRUE;
0137     }
0138 
0139     err=errno;
0140 
0141     if (ENOENT==err)
0142     {
0143         return EXFALSE;
0144     }
0145 
0146     NDRX_LOG(log_error, "Failed to check file [%s] presence: %s",
0147         fname, strerror(err));
0148     userlog("Failed to check file [%s] presence: %s",
0149         fname, strerror(err));
0150 
0151     return EXFAIL;
0152 }
0153 
0154 /**
0155  * Check that memory based log entry exists.
0156  * @param tmxid - serialized XID
0157  * @return NULL or log entry
0158  */
0159 expublic int tms_log_exists_entry(char *tmxid)
0160 {
0161     atmi_xa_log_t *r = NULL;
0162     
0163     MUTEX_LOCK_V(M_tx_hash_lock);
0164     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0165     MUTEX_UNLOCK_V(M_tx_hash_lock);
0166     
0167     if (NULL!=r)
0168     {
0169         return EXTRUE;
0170     }
0171     else
0172     {
0173         return EXFALSE;
0174     }
0175 }
0176 
0177 /**
0178  * Get the log entry of the transaction
0179  * Now we should lock it for thread.
0180  * TODO: Add option for wait on lock.
0181  * This would be needed for cases for example when Postgres may report
0182  * the status in background, and for some reason TMSRV already processes the
0183  * transaction (ether abort, or new register branch, etc...) and some stalled
0184  * PG process wants to finish the work off. Thus we need to waited lock for
0185  * foreground operations.
0186  * TODO: In future release lock could be based on real mutex within the
0187  * transaction object, so that we do not have to employ the sleep
0188  * @param tmxid - serialized XID
0189  * @param[in] dowait milliseconds to wait for lock, before give up
0190  * @param[out] locke lock error
0191  * @return NULL or log entry
0192  */
0193 expublic atmi_xa_log_t * tms_log_get_entry(char *tmxid, int dowait, int *locke)
0194 {
0195     atmi_xa_log_t *r = NULL;
0196     ndrx_stopwatch_t w;
0197     
0198     if (dowait)
0199     {
0200         ndrx_stopwatch_reset(&w);
0201     }
0202     
0203     if (NULL!=locke)
0204     {
0205         *locke=EXFALSE;
0206     }
0207     
0208 restart:
0209     MUTEX_LOCK_V(M_tx_hash_lock);
0210     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0211     
0212     if (NULL!=r)
0213     {
0214         if (r->lockthreadid)
0215         {
0216             if (dowait && ndrx_stopwatch_get_delta(&w) < dowait)
0217             {
0218                 MUTEX_UNLOCK_V(M_tx_hash_lock);
0219                 /* sleep 100 msec */
0220                 usleep(100000);
0221                 goto restart;
0222                 
0223             }
0224             
0225             NDRX_LOG(log_error, "Transaction [%s] already locked for thread_id: %"
0226                     PRIu64 " lock time: %d msec",
0227                     tmxid, r->lockthreadid, dowait);
0228             
0229             userlog("tmsrv: Transaction [%s] already locked for thread_id: %" PRIu64
0230                     "lock time: %d msec",
0231                     tmxid, r->lockthreadid, dowait);
0232             r = NULL;
0233             
0234             /* cannot get lock */
0235             if (NULL!=locke)
0236             {
0237                 *locke=EXTRUE;
0238             }
0239             
0240         }
0241         else
0242         {
0243             r->lockthreadid = ndrx_gettid();
0244             NDRX_LOG(log_info, "Transaction [%s] locked for thread_id: %" PRIu64,
0245                     tmxid, r->lockthreadid);
0246         }
0247     }
0248     
0249     MUTEX_UNLOCK_V(M_tx_hash_lock);
0250     
0251     return r;
0252 }
0253 
0254 /**
0255  * set current lock sequence number
0256  * @param p_tl log entry
0257  */
0258 exprivate int tms_log_setseq(atmi_xa_log_t *p_tl)
0259 {
0260     int ret= EXSUCCEED;
0261     long grp_flags=0;
0262     /* if we are in singleton group mode, validate that we still
0263      * own the lock
0264      */
0265     if (G_atmi_env.procgrp_no)
0266     {
0267         p_tl->sg_sequence=tpsgislocked(G_atmi_env.procgrp_no
0268             , TPPG_SGVERIFY|TPPG_NONSGSUCC
0269             , &grp_flags);
0270 
0271         if (EXFAIL==p_tl->sg_sequence)
0272         {
0273             NDRX_LOG(log_error, "tpsgislocked failed %s", tpstrerror(tperrno));
0274             EXFAIL_OUT(ret);
0275         }
0276         
0277         if (grp_flags & TPPG_SINGLETON && p_tl->sg_sequence<=0)
0278         {
0279             NDRX_LOG(log_error, "Singleton group %d lock lost (at start) - exit(-1)",
0280                 G_atmi_env.procgrp_no);
0281             userlog("Singleton group %d lock lost (at start) - exit(-1)",
0282                 G_atmi_env.procgrp_no);
0283             /* !!!! */
0284             exit(EXFAIL);
0285         }
0286     }
0287 
0288 out:
0289     return ret;
0290 }
0291 
0292 /**
0293  * run checkpoint on transaction
0294  * @param p_tl transaction log
0295  */
0296 expublic int tms_log_checkpointseq(atmi_xa_log_t *p_tl)
0297 {
0298     int ret=EXSUCCEED;
0299     long seq;
0300     long grp_flags=0;
0301     /* if we are in singleton group mode, validate that we still
0302      * own the lock
0303      */
0304     if (G_atmi_env.procgrp_no)
0305     {
0306         seq=tpsgislocked(G_atmi_env.procgrp_no, TPPG_SGVERIFY|TPPG_NONSGSUCC, &grp_flags);
0307 
0308         if (seq < 0)
0309         {
0310             NDRX_LOG(log_error, "tpsgislocked returns %s", tpstrerror(tperrno));
0311             EXFAIL_OUT(ret);
0312         }
0313 
0314         if ((grp_flags & TPPG_SINGLETON) && 0==seq)
0315         {
0316             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost - exit(-1)",
0317                 G_atmi_env.procgrp_no, tpgetnodeid());
0318             userlog("Singleton group %d on node %ld lock lost - exit(-1)",
0319                 G_atmi_env.procgrp_no, tpgetnodeid());
0320             /* !!!! */
0321             exit(EXFAIL);
0322         }
0323 
0324         /* failover has happened during transaction processing
0325          * thus cannot proceed with decsion, only after restart
0326          */
0327         if ( (grp_flags & TPPG_SINGLETON) && (seq - p_tl->sg_sequence >= G_atmi_env.sglockinc))
0328         {
0329             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0330                 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0331             userlog("Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0332                 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0333             /* !!!! */
0334             exit(EXFAIL);
0335         }
0336 
0337         /* we are safe to continue... */
0338         p_tl->sg_sequence=seq;
0339     }
0340 out:
0341     return ret;
0342 }
0343 
0344 /**
0345  * Log transaction as started
0346  * @param xai - XA Info struct
0347  * @param txtout - transaction timeout
0348  * @param[out] btid branch transaction id (if not dynamic reg)
0349  * @return SUCCEED/FAIL
0350  */
0351 expublic int tms_log_start(atmi_xa_tx_info_t *xai, int txtout, long tmflags,
0352         long *btid)
0353 {
0354     int ret = EXSUCCEED;
0355     int hash_added = EXFALSE;
0356     atmi_xa_log_t *tmp = NULL;
0357     atmi_xa_rm_status_btid_t *bt;
0358     /* 1. Add stuff to hash list */
0359     if (NULL==(tmp = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0360     {
0361         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0362         ret=EXFAIL;
0363         goto out;
0364     }
0365     tmp->txstage = XA_TX_STAGE_ACTIVE;
0366     
0367     tmp->tmnodeid = xai->tmnodeid;
0368     tmp->tmsrvid = xai->tmsrvid;
0369     tmp->tmrmid = xai->tmrmid;
0370     NDRX_STRCPY_SAFE(tmp->tmxid, xai->tmxid);
0371     
0372     tmp->t_start = ndrx_utc_tstamp();
0373     tmp->t_update = ndrx_utc_tstamp();
0374     tmp->txtout = txtout;
0375     tmp->log_version = LOG_VERSION_2;   /**< Now with CRC32 groups */
0376     ndrx_stopwatch_reset(&tmp->ttimer);
0377 
0378     if (EXSUCCEED!=tms_log_setseq(tmp))
0379     {
0380         EXFAIL_OUT(ret);
0381     }
0382 
0383     /* lock for us, yet it is not shared*/
0384     tmp->lockthreadid = ndrx_gettid();
0385     
0386     /* TODO: Open the log file & write tms_close_logfile
0387      * Only question, how long 
0388      */
0389     
0390     /* Open log file */
0391     if (EXSUCCEED!=tms_open_logfile(tmp, "a"))
0392     {
0393         NDRX_LOG(log_error, "Failed to create transaction log file");
0394         userlog("Failed to create transaction log file");
0395         EXFAIL_OUT(ret);
0396     }
0397     
0398     /* log the opening infos */
0399     if (EXSUCCEED!=tms_log_info(tmp))
0400     {
0401         NDRX_LOG(log_error, "Failed to log tran info");
0402         userlog("Failed to log tran info");
0403         EXFAIL_OUT(ret);
0404     }
0405     
0406     /* Only for static... */
0407     if (!(tmflags & TMFLAGS_DYNAMIC_REG))
0408     {
0409         /* assign btid? 
0410         tmp->rmstatus[xai->tmrmid-1].rmstatus = XA_RM_STATUS_ACTIVE;
0411         */
0412         char start_stat = XA_RM_STATUS_ACTIVE;
0413         
0414         *btid = tms_btid_gettid(tmp, xai->tmrmid);
0415 
0416         /* Just get the TID, status will be changed with file update  */
0417         if (EXSUCCEED!=tms_btid_add(tmp, xai->tmrmid, *btid, XA_RM_STATUS_NULL, 
0418                 0, 0, &bt))
0419         {
0420             NDRX_LOG(log_error, "Failed to add BTID: %ld", *btid);
0421             EXFAIL_OUT(ret);
0422         }
0423         
0424         if (tmflags & TMFLAGS_TPNOSTARTXID)
0425         {
0426             NDRX_LOG(log_info, "TPNOSTARTXID => starting as %c - prepared", 
0427                     XA_RM_STATUS_PREP);
0428             start_stat = XA_RM_STATUS_UNKOWN;
0429         }
0430         
0431         /* log to transaction file -> Write off RM status */
0432         if (EXSUCCEED!=tms_log_rmstatus(tmp, bt, start_stat, 0, 0))
0433         {
0434             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0435             EXFAIL_OUT(ret);
0436         }
0437 
0438     }
0439     
0440     MUTEX_LOCK_V(M_tx_hash_lock);
0441     EXHASH_ADD_STR( M_tx_hash, tmxid, tmp);
0442     MUTEX_UNLOCK_V(M_tx_hash_lock);
0443     
0444     hash_added = EXTRUE;
0445     
0446 out:
0447     
0448     /* unlock */
0449     if (EXSUCCEED!=ret)
0450     {
0451         tms_remove_logfile(tmp, hash_added);
0452     }
0453     else
0454     {
0455         if (NULL!=tmp)
0456         {
0457             tms_unlock_entry(tmp);
0458         }
0459     }
0460     return ret;
0461 }
0462 
0463 /**
0464  * Add RM to transaction.
0465  * If tid is known, then check and/or add.
0466  * If add, then ensure that we step up the BTID counter, if the number is
0467  * higher than counter have.
0468  * @param xai
0469  * @param rmid - 1 based rmid
0470  * @param[in,out] btid branch tid (if it is known, i.e. 0), if not known, then -1
0471  *  for unknown BTID requests, new BTID is generated
0472  * @return EXSUCCEED/EXFAIL
0473  */
0474 expublic int tms_log_addrm(atmi_xa_tx_info_t *xai, short rmid, int *p_is_already_logged,
0475         long *btid, long flags)
0476 {
0477     int ret = EXSUCCEED;
0478     atmi_xa_log_t *p_tl= NULL;
0479     atmi_xa_rm_status_btid_t *bt = NULL;
0480     
0481     /* atmi_xa_rm_status_t stat; */
0482     /*
0483     memset(&stat, 0, sizeof(stat));
0484     stat.rmstatus = XA_RM_STATUS_ACTIVE;
0485      * 
0486     */
0487     
0488     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, NULL)))
0489     {
0490         NDRX_LOG(log_error, "No transaction/lock timeout under xid_str: [%s]", 
0491                 xai->tmxid);
0492         ret=EXFAIL;
0493         goto out_nolock;
0494     }
0495     
0496     if (1 > rmid || rmid>NDRX_MAX_RMS)
0497     {
0498         NDRX_LOG(log_error, "RMID %hd out of bounds!!!");
0499         EXFAIL_OUT(ret);
0500     }
0501     
0502     /* if processing in background (say time-out rollback, the commit shall not be
0503      * accepted)
0504      */
0505     if (p_tl->is_background || XA_TX_STAGE_ACTIVE!=p_tl->txstage)
0506     {
0507         NDRX_LOG(log_error, "cannot register xid [%s] is_background: (%d) stage: (%hd)", 
0508                 xai->tmxid, p_tl->is_background, p_tl->txstage);
0509         EXFAIL_OUT(ret);
0510     }
0511     
0512     /*
0513     if (p_tl->rmstatus[rmid-1].rmstatus && NULL!=p_is_already_logged)
0514     {
0515         *p_is_already_logged = EXTRUE;
0516     }
0517      * */
0518     /*
0519     p_tl->rmstatus[rmid-1] = stat;
0520     */
0521     
0522     ret = tms_btid_addupd(p_tl, rmid, btid, XA_RM_STATUS_NULL, 
0523             0, 0, p_is_already_logged, &bt);
0524     
0525     /* log to transaction file */
0526     if (!(*p_is_already_logged))
0527     {
0528         char start_stat = XA_RM_STATUS_ACTIVE;
0529         
0530         NDRX_LOG(log_info, "RMID %hd/%ld added to xid_str: [%s]", 
0531             rmid, *btid, xai->tmxid);
0532         
0533         if (flags & TMFLAGS_TPNOSTARTXID)
0534         {
0535             NDRX_LOG(log_info, "TPNOSTARTXID => adding as %c - unknown", 
0536                     XA_RM_STATUS_UNKOWN);
0537             start_stat = XA_RM_STATUS_UNKOWN;
0538         }
0539         
0540         if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, start_stat, 0, 0))
0541         {
0542             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0543             EXFAIL_OUT(ret);
0544         }
0545     }
0546     else
0547     {
0548         NDRX_LOG(log_info, "RMID %hd/%ld already joined to xid_str: [%s]", 
0549             rmid, *btid, xai->tmxid);
0550     }
0551    
0552 out:
0553     /* unlock transaction from thread */
0554     tms_unlock_entry(p_tl);
0555 
0556 out_nolock:
0557     
0558     return ret;
0559 }
0560 
0561 /**
0562  * Change RM status
0563  * @param xai
0564  * @param rmid - 1 based rmid
0565  * @param[in] btid branch tid 
0566  * @param[in] rmstatus RM status to set
0567  * @return EXSUCCEED/EXFAIL
0568  */
0569 expublic int tms_log_chrmstat(atmi_xa_tx_info_t *xai, short rmid, 
0570         long btid, char rmstatus, UBFH *p_ub)
0571 {
0572     int ret = EXSUCCEED;
0573     atmi_xa_log_t *p_tl= NULL;
0574     atmi_xa_rm_status_btid_t *bt = NULL;
0575     int locke;
0576     
0577     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c]",
0578             xai->tmxid, btid, rmstatus);
0579     
0580     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, &locke)))
0581     {
0582         if (locke)
0583         {
0584             NDRX_LOG(log_error, "Lock acquire timed out for xid_str: [%s] - TPETIME", 
0585                     xai->tmxid);
0586             atmi_xa_set_error_fmt(p_ub, TPETIME, NDRX_XA_ERSN_LOCK, 
0587                         "Failed to acquire locked!");
0588         }
0589         else
0590         {
0591             NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0592                     xai->tmxid);
0593 
0594             atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_NOTX, 
0595                         "Failed to get transaction or locked for processing!");
0596         }
0597         
0598         ret=EXFAIL;
0599         goto out_nolock;
0600     }
0601     
0602     bt = tms_btid_find(p_tl, rmid, btid);
0603     
0604     if (rmstatus==bt->rmstatus)
0605     {
0606         NDRX_LOG(log_warn, "xid: [%s] BTID %ld already in status [%c]",
0607             xai->tmxid, btid, rmstatus);
0608     }
0609     
0610     if (XA_RM_STATUS_UNKOWN!=bt->rmstatus)
0611     {
0612         NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0613                 xai->tmxid);
0614         
0615         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_INVPARAM, 
0616                     "BTID %ld in status %c but want to set to: %c!",
0617                 btid, bt->rmstatus, rmstatus);
0618         
0619         EXFAIL_OUT(ret);
0620     }
0621     
0622     /* Change status to expected one.. */
0623     
0624     if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, rmstatus, 0, 0))
0625     {
0626         NDRX_LOG(log_error, "Failed to write RM status to file: %ld, "
0627                 "new stat: %c old stat: [%c]", 
0628                 btid, rmstatus, bt->rmstatus);
0629         
0630         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_RMLOGFAIL, 
0631                     "BTID %ld in status %c but want to set to: %c!",
0632                 btid, bt->rmstatus, rmstatus);
0633         
0634         EXFAIL_OUT(ret);
0635     }
0636     
0637     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c] OK",
0638             xai->tmxid, btid, rmstatus);
0639    
0640 out:
0641     /* unlock transaction from thread */
0642     tms_unlock_entry(p_tl);
0643 
0644 out_nolock:
0645     
0646     return ret;
0647 }
0648 
0649 /**
0650  * Generate logfile path + name for transaction
0651  * @param fname output buffer
0652  * @param fnamesz output buffer size
0653  * @param tmxid transaction id (str)
0654  */
0655 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid)
0656 {
0657         snprintf(fname, fnamesz, "%s/TRN-%ld-%hd-%d-%s",
0658             G_tmsrv_cfg.tlog_dir, G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid,
0659             G_server_conf.srv_id, tmxid);
0660 }
0661 
0662 /**
0663  * Get the log file name for particular transaction
0664  * @param p_tl
0665  * @return 
0666  */
0667 exprivate void tms_get_file_name(atmi_xa_log_t *p_tl)
0668 {
0669     tms_gen_file_name(p_tl->fname, sizeof(p_tl->fname), p_tl->tmxid);
0670 }
0671 
0672 /**
0673  * Get the log file name for particular transaction
0674  * @param p_tl
0675  * @return 
0676  */
0677 expublic int tms_open_logfile(atmi_xa_log_t *p_tl, char *mode)
0678 {
0679     int ret = EXSUCCEED;
0680     
0681     if (EXEOS==p_tl->fname[0])
0682     {
0683         tms_get_file_name(p_tl);
0684     }
0685     
0686     if (NULL!=p_tl->f)
0687     {
0688         /*
0689         NDRX_LOG(log_warn, "Log file [%s] already open", p_tl->fname);
0690          */
0691         goto out;
0692     }
0693     
0694     /* Try to open the file */
0695     if (NULL==(p_tl->f=NDRX_FOPEN(p_tl->fname, mode)))
0696     {
0697         userlog("Failed to open XA transaction log file: [%s]: %s", 
0698                 p_tl->fname, strerror(errno));
0699         NDRX_LOG(log_error, "Failed to open XA transaction "
0700                 "log file: [%s]: %s", 
0701                 p_tl->fname, strerror(errno));
0702         ret=EXFAIL;
0703         goto out;
0704     }
0705     
0706     NDRX_LOG(log_debug, "XA tx log file [%s] open for [%s]", 
0707             p_tl->fname, mode);
0708 
0709 out:    
0710     return ret;
0711 }
0712 
0713 /**
0714  * Read the log file from disk
0715  * @param logfile - path to log file
0716  * @param tmxid - transaction ID string
0717  * @param pp_tl - transaction log (double ptr). Returned if all ok.
0718  * @return SUCCEED/FAIL
0719  */
0720 expublic int tms_load_logfile(char *logfile, char *tmxid, atmi_xa_log_t **pp_tl)
0721 {
0722     int ret = EXSUCCEED;
0723     int len;
0724     char buf[1024]; /* Should be enough... */
0725     char *p;
0726     int line = 1;
0727     int got_term_last=EXFALSE, infos_ok=EXFALSE, missing_term_in_file=EXFALSE;
0728     int do_housekeep = EXFALSE;
0729     int wrote, err;
0730     long diff;
0731     *pp_tl = NULL;
0732     
0733     if (NULL==(*pp_tl = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0734     {
0735         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0736         EXFAIL_OUT(ret);
0737     }
0738     /* set file name */
0739     if (strlen(logfile)>PATH_MAX)
0740     {
0741         NDRX_LOG(log_error, "Invalid file name!");
0742         EXFAIL_OUT(ret);
0743     }
0744     
0745     NDRX_STRCPY_SAFE((*pp_tl)->fname, logfile);
0746     
0747     len = strlen(tmxid);
0748     if (len>sizeof((*pp_tl)->tmxid)-1)
0749     {
0750         NDRX_LOG(log_error, "tmxid too long [%s]!", tmxid);
0751         EXFAIL_OUT(ret);
0752     }
0753     
0754     NDRX_STRCPY_SAFE((*pp_tl)->tmxid, tmxid);
0755     /* we start with active... */
0756     (*pp_tl)->txstage = XA_TX_STAGE_ACTIVE;
0757 
0758     /* set lock sequence number, if in singleton group */
0759     if (EXSUCCEED!=tms_log_setseq(*pp_tl))
0760     {
0761         EXFAIL_OUT(ret);
0762     }
0763 
0764     /* Open the file */
0765     if (EXSUCCEED!=tms_open_logfile(*pp_tl, "a+"))
0766     {
0767         NDRX_LOG(log_error, "Failed to open transaction file!");
0768         EXFAIL_OUT(ret);
0769     }
0770 
0771     /* Read line by line & call parsing functions 
0772      * we should also parse start of the transaction (date/time)
0773      */
0774     while (fgets(buf, sizeof(buf), (*pp_tl)->f))
0775     {
0776         got_term_last = EXFALSE;
0777         len = strlen(buf);
0778         
0779         if (1 < len && '\n'==buf[len-1])
0780         {
0781             buf[len-1]=EXEOS;
0782             len--;
0783             got_term_last = EXTRUE;
0784         }
0785 
0786         /* if somebody operated with Windows... */
0787         if (1 < len && '\r'==buf[len-1])
0788         {
0789             buf[len-1]=EXEOS;
0790             len--;
0791         }
0792         
0793         NDRX_LOG(log_debug, "Parsing line: [%s]", buf);
0794         
0795         if (!got_term_last)
0796         {
0797             /* Corrupted file, terminator expected 
0798              * - mark record as corrupted.
0799              * - Also we will try to repair the situation, if last line was corrupted
0800              * then we mark with with NAK.
0801              */
0802             missing_term_in_file = EXTRUE;
0803             continue;
0804         }
0805         
0806         /* If there was power off, then we might get corrupted lines...
0807          */
0808         p = strchr(buf, ':');
0809         
0810         if (NULL==p)
0811         {
0812             NDRX_LOG(log_error, "Symbol ':' not found on line [%d]!", 
0813                     line);
0814             EXFAIL_OUT(ret);
0815         }
0816         p++;
0817         
0818         /* Classify & parse record */
0819         NDRX_LOG(log_error, "Record: %c log_version: %d", *p, (*pp_tl)->log_version);
0820         
0821         /* If this is initial line && record J
0822          * or log_version > 0, perform checksuming.
0823          */
0824         if ((!infos_ok && LOG_COMMAND_J==*p) || ((*pp_tl)->log_version > LOG_VERSION_1))
0825         {
0826             char *rs = strchr(buf, LOG_RS_SEP);
0827             unsigned long crc32_calc=0, crc32_got=0;
0828             
0829             if (NULL==rs)
0830             {
0831                 NDRX_LOG(log_error, "TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0832                             logfile, LOG_RS_SEP, buf);
0833                 userlog("TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0834                             logfile, LOG_RS_SEP, buf);
0835                 continue;
0836             }
0837             
0838             *rs = EXEOS;
0839             rs++;
0840             
0841             /* Now we get crc32 */
0842             crc32_calc = ndrx_Crc32_ComputeBuf(0, buf, strlen(buf));
0843             sscanf(rs, "%lx", &crc32_got);
0844             
0845             /* verify the log... */
0846             if (crc32_calc!=crc32_got)
0847             {
0848                 NDRX_LOG(log_error, "TMSRV log file [%s] invalid log [%s] line "
0849                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0850                         logfile, buf, crc32_calc, crc32_got);
0851                 userlog("TMSRV log file [%s] invalid log [%s] line "
0852                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0853                          logfile, buf, crc32_calc, crc32_got);
0854                 continue;
0855             }
0856             
0857         }
0858         
0859         switch (*p)
0860         {
0861             /* Version 0: */
0862             case LOG_COMMAND_I:
0863             /* Version 1: */
0864             case LOG_COMMAND_J:
0865                 
0866                 if (infos_ok)
0867                 {
0868                     NDRX_LOG(log_error, "TMSRV log file [%s] duplicate info block", 
0869                             logfile);
0870                     userlog("TMSRV log file [%s] duplicate info block", 
0871                             logfile);
0872                     do_housekeep=EXTRUE;
0873                     EXFAIL_OUT(ret);
0874                 }
0875                 
0876                 /* set the version number */
0877                 (*pp_tl)->log_version = *p - LOG_COMMAND_I + 1;
0878                 
0879                 if (EXSUCCEED!=tms_parse_info(buf, *pp_tl))
0880                 {
0881                     EXFAIL_OUT(ret);
0882                 }
0883                 
0884                 infos_ok = EXTRUE;
0885                 break;
0886             case LOG_COMMAND_STAGE: 
0887                 
0888                 if (!infos_ok)
0889                 {
0890                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0891                             logfile);
0892                     userlog("TMSRV log file [%s] Info record required first", 
0893                             logfile);
0894                     do_housekeep=EXTRUE;
0895                     EXFAIL_OUT(ret);
0896                 }
0897                 
0898                 if (EXSUCCEED!=tms_parse_stage(buf, *pp_tl))
0899                 {
0900                     EXFAIL_OUT(ret);
0901                 }
0902                 
0903                 break;
0904             case LOG_COMMAND_RMSTAT:
0905                 
0906                 if (!infos_ok)
0907                 {
0908                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0909                             logfile);
0910                     userlog("TMSRV log file [%s] Info record required first", 
0911                             logfile);
0912                     do_housekeep=EXTRUE;
0913                     EXFAIL_OUT(ret);
0914                 }
0915                 
0916                 if (EXSUCCEED!=tms_parse_rmstatus(buf, *pp_tl))
0917                 {
0918                     EXFAIL_OUT(ret);
0919                 }
0920                 
0921                 break;
0922             default:
0923                 NDRX_LOG(log_warn, "Unknown record %c on line %d", *p, line);
0924                 break;
0925         }
0926         
0927         line++;
0928     }
0929     
0930     /* check was last read OK */
0931     if (!feof((*pp_tl)->f))
0932     {
0933         err = ferror((*pp_tl)->f);
0934         
0935         NDRX_LOG(log_error, "TMSRV log file [%s] failed to read: %s", 
0936                     logfile, strerror(err));
0937         userlog("TMSRV log file [%s] failed to read: %s", 
0938                     logfile, strerror(err));
0939         EXFAIL_OUT(ret);
0940     }
0941     
0942     if (!infos_ok)
0943     {
0944         NDRX_LOG(log_error, "TMSRV log file [%s] no [%c] info record - not loading", 
0945                     logfile, LOG_COMMAND_J);
0946         userlog("TMSRV log file [%s] no [%c] info record - not loading", 
0947                     logfile, LOG_COMMAND_J);
0948         do_housekeep=EXTRUE;
0949         EXFAIL_OUT(ret);
0950     }
0951     
0952     /* so that next time we can parse OK */
0953     if (missing_term_in_file)
0954     {
0955         /* so last line bad, lets fix... */
0956         if (got_term_last)
0957         {
0958             NDRX_LOG(log_error, "TMSRV log file [%s] - invalid read, "
0959                     "previous lines seen without \\n", 
0960                     logfile);
0961             userlog("TMSRV log file [%s] - invalid read, "
0962                     "previous lines without \\n", 
0963                     logfile);
0964             EXFAIL_OUT(ret);
0965         }
0966         else
0967         {
0968             /* append with \n */
0969             NDRX_LOG(log_error, "Terminating last line (with out checksum)");
0970 
0971             wrote=fprintf((*pp_tl)->f, "\n");
0972 
0973             if (wrote!=1)
0974             {
0975                 err = ferror((*pp_tl)->f);
0976                 NDRX_LOG(log_error, "TMSRV log file [%s] failed to terminate line: %s", 
0977                         logfile, strerror(err));
0978                 userlog("TMSRV log file [%s] failed to terminate line: %s", 
0979                         logfile, strerror(err));
0980                 EXFAIL_OUT(ret);
0981             }
0982         }
0983     }
0984     
0985     /* thus it will start to drive it by background thread
0986      * Any transaction will be completed in background.
0987      */
0988     (*pp_tl)->is_background = EXTRUE;
0989     
0990     
0991     /* 
0992      * If we keep in active state, then timeout will kill the transaction (or it will be finished by in progress 
0993      *  binary, as maybe transaction is not yet completed). Thought we might loss the infos
0994      *  of some registered process, but then it would be aborted transaction anyway.
0995      *  but then record might be idling in the resource.
0996      * 
0997      * If we had stage logged, then transaction would be completed accordingly,
0998      * and would complete with those resources for which states is not yet
0999      * finalized (according to logs)
1000      */
1001     
1002     /* Add transaction to the hash. We need locking here. 
1003      * 
1004      * If transaction was in prepare stage XA_TX_STAGE_PREPARING (40)
1005      * We need to abort it because, there is no chance that caller will get
1006      * response back.
1007      */
1008     if (XA_TX_STAGE_PREPARING == (*pp_tl)->txstage 
1009             || XA_TX_STAGE_ACTIVE == (*pp_tl)->txstage)
1010     {
1011         NDRX_LOG(log_error, "XA Transaction [%s] was in active or preparing stage and "
1012                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1013         
1014         userlog("XA Transaction [%s] was in  active or preparing stage and "
1015                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1016         
1017         /* change the status (+ log) */
1018         (*pp_tl)->lockthreadid = ndrx_gettid();
1019         tms_log_stage(*pp_tl, XA_TX_STAGE_ABORTING, EXTRUE);
1020         (*pp_tl)->lockthreadid = 0;
1021     }
1022     
1023     MUTEX_LOCK_V(M_tx_hash_lock);
1024     EXHASH_ADD_STR( M_tx_hash, tmxid, (*pp_tl));
1025     MUTEX_UNLOCK_V(M_tx_hash_lock);
1026     
1027     NDRX_LOG(log_debug, "TX [%s] loaded OK", tmxid);
1028 out:
1029 
1030     /* Clean up if error. */
1031     if (EXSUCCEED!=ret)
1032     {
1033         userlog("Failed to load/recover transaction [%s]", logfile);
1034         if (NULL!=*pp_tl)
1035         {
1036             /* remove any stuff added... */
1037             
1038             if (tms_is_logfile_open(*pp_tl))
1039             {
1040                 tms_close_logfile(*pp_tl);
1041             }
1042 
1043             tms_remove_logfree(*pp_tl, EXFALSE);
1044             *pp_tl = NULL;
1045         }
1046     }
1047 
1048     /* clean up corrupted files */
1049     if (do_housekeep)
1050     {
1051         /* remove old corrupted logs... */
1052         if ((diff=ndrx_file_age(logfile)) > G_tmsrv_cfg.housekeeptime)
1053         {
1054             NDRX_LOG(log_error, "Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
1055                     logfile, diff, G_tmsrv_cfg.housekeeptime);
1056             userlog("Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
1057                     logfile, diff, G_tmsrv_cfg.housekeeptime);
1058 
1059             if (EXSUCCEED!=unlink(logfile))
1060             {
1061                 err = errno;
1062                 NDRX_LOG(log_error, "Failed to unlink [%s]: %s", strerror(err));
1063                 userlog("Failed to unlink [%s]: %s", strerror(err));
1064             }
1065         }
1066     }
1067 
1068     return ret;
1069 }
1070 
1071 /**
1072  * Test to see is log file open
1073  * @param p_tl
1074  * @return TRUE/FALSE
1075  */
1076 expublic int tms_is_logfile_open(atmi_xa_log_t *p_tl)
1077 {
1078     if (p_tl->f)
1079         return EXTRUE;
1080     else
1081         return EXFALSE;
1082 }
1083 
1084 /**
1085  * Close the log file
1086  * @param p_tl
1087  */
1088 expublic void tms_close_logfile(atmi_xa_log_t *p_tl)
1089 {
1090     if (NULL!=p_tl->f)
1091     {
1092         NDRX_FCLOSE(p_tl->f);
1093         p_tl->f = NULL;
1094     }
1095 }
1096 
1097 /**
1098  * Free up log file (just memory)
1099  * @param p_tl log handle
1100  * @param hash_rm remove log entry from tran hash
1101  */
1102 expublic void tms_remove_logfree(atmi_xa_log_t *p_tl, int hash_rm)
1103 {
1104     int i;
1105     atmi_xa_rm_status_btid_t *el, *elt;
1106     
1107     if (hash_rm)
1108     {
1109         MUTEX_LOCK_V(M_tx_hash_lock);
1110         EXHASH_DEL(M_tx_hash, p_tl); 
1111         MUTEX_UNLOCK_V(M_tx_hash_lock);
1112     }
1113 
1114     /* Remove branch TIDs */
1115     
1116     for (i=0; i<NDRX_MAX_RMS; i++)
1117     {
1118         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1119         {
1120             EXHASH_DEL(p_tl->rmstatus[i].btid_hash, el);
1121             NDRX_FREE(el);
1122         }
1123     }
1124     
1125     NDRX_FREE(p_tl);
1126 }
1127 
1128 /**
1129  * Removes also transaction from hash. Either fully
1130  * committed or fully aborted...
1131  * @param p_tl - becomes invalid after removal!
1132  * @param hash_rm remove from hash lists
1133  */
1134 expublic void tms_remove_logfile(atmi_xa_log_t *p_tl, int hash_rm)
1135 {
1136     int have_file = EXFALSE;
1137     
1138     if (tms_is_logfile_open(p_tl))
1139     {
1140         have_file = EXTRUE;
1141         tms_close_logfile(p_tl);
1142     }/* Check for file existance, if was not open */
1143     else if (0 == access(p_tl->fname, 0))
1144     {
1145         have_file = EXTRUE;
1146     }
1147         
1148     if (have_file)
1149     {
1150         if (EXSUCCEED!=unlink(p_tl->fname))
1151         {
1152             int err = errno;
1153             NDRX_LOG(log_debug, "Failed to remove tx log file: %d (%s)", 
1154                     err, strerror(err));
1155             userlog("Failed to remove tx log file: %d (%s)", 
1156                     err, strerror(err));
1157         }
1158     }
1159     
1160     /* Remove the log for hash! */
1161     tms_remove_logfree(p_tl, hash_rm);
1162 
1163 }
1164 
1165 /**
1166  * We should have universal log writter
1167  */
1168 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...)
1169 {
1170     int ret = EXSUCCEED;
1171     char msg[LOG_MAX+1] = {EXEOS};
1172     char msg2[LOG_MAX+1] = {EXEOS};
1173     int len, wrote, exp;
1174     int make_error = EXFALSE;
1175     unsigned long crc32;
1176     va_list ap;
1177     
1178     CHK_THREAD_ACCESS;
1179     
1180     /* If log not open - just skip... */
1181     if (NULL==p_tl->f)
1182     {
1183         return EXSUCCEED;
1184     }
1185      
1186     va_start(ap, fmt);
1187     (void) vsnprintf(msg, sizeof(msg), fmt, ap);
1188     va_end(ap);
1189     
1190     snprintf(msg2, sizeof(msg2), "%lld:%c:%s", ndrx_utc_tstamp(), command, msg);
1191     len = strlen(msg2);
1192     
1193     /* check exactly how much bytes was written */
1194     
1195     if (1==G_atmi_env.test_tmsrv_write_fail)
1196     {
1197         make_error = EXTRUE;
1198     }
1199     else if (2==G_atmi_env.test_tmsrv_write_fail)
1200     {
1201         /* generate 25% random errors */
1202         if (ndrx_rand() % 4 == 0)
1203         {
1204             make_error = EXTRUE;
1205         }
1206         
1207     }
1208     
1209     crc32 = ndrx_Crc32_ComputeBuf(0, msg2, len);
1210     
1211     /* Simulate that only partial message is written in case
1212      * if disk is full or crash happens
1213      */
1214     wrote=0;
1215     
1216     /* if write file test case - write partially, also omit EOL terminator 
1217      * Also... needs flag for commit crash simulation... i.e. 
1218      * So that if commit crash is set, we partially write the command (stage change)
1219      * and afterwards tmsrv exits.
1220      * And then when flag is removed, transaction shall commit OK
1221      */
1222     if (p_tl->log_version == LOG_VERSION_1)
1223     {
1224         NDRX_LOG(log_debug, "Log format: v%d", p_tl->log_version);
1225         
1226         exp = len+1;
1227         if (make_error)
1228         {
1229             exp++;
1230         }
1231         wrote=fprintf(p_tl->f, "%s\n", msg2);
1232     }
1233     else
1234     {
1235         /* version 2+ */
1236         exp = len+8+1+1;
1237         
1238         if (make_error)
1239         {
1240             crc32+=1;
1241             exp++;
1242         }
1243         
1244         wrote=fprintf(p_tl->f, "%s%c%08lx\n", msg2, LOG_RS_SEP, crc32);
1245     }
1246     
1247     if (wrote != exp)
1248     {
1249         int err = errno;
1250         
1251         /* For Q/A purposes - simulate no space error, if requested */
1252         if (make_error)
1253         {
1254             NDRX_LOG(log_error, "QA point: make_error TRUE");
1255             err = ENOSPC;
1256         }
1257         
1258         NDRX_LOG(log_error, "ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1259                 exp, wrote, strerror(err));
1260         userlog("ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1261                 exp, wrote, strerror(err));
1262         
1263         ret=EXFAIL;
1264         goto out;
1265     }
1266     
1267     
1268 out:
1269     /* flush what ever we have */
1270     if (EXSUCCEED!=fflush(p_tl->f))
1271     {
1272         int err=errno;
1273         userlog("ERROR! Failed to fflush(): %s", strerror(err));
1274         NDRX_LOG(log_error, "ERROR! Failed to fflush(): %s", strerror(err));
1275     ret=EXFAIL;
1276     }
1277     /*fsync(fileno(p_tl->f));*/
1278     return ret;
1279 }
1280 
1281 /**
1282  * Write general info about transaction
1283  * FORMAT: <RM_ID>:<NODE_ID>:<SRV_ID>
1284  * @param p_tl
1285  * @param stage
1286  * @return 
1287  */
1288 expublic int tms_log_info(atmi_xa_log_t *p_tl)
1289 {
1290     int ret = EXSUCCEED;
1291     char rmsbuf[NDRX_MAX_RMS*3+1]={EXEOS};
1292     int i;
1293     int len;
1294     
1295     CHK_THREAD_ACCESS;
1296     
1297     /* log the RMs participating in transaction 
1298     for (i=0; i<NDRX_MAX_RMS; i++)
1299     {
1300         if (p_tl->rmstatus[i].rmstatus)
1301         {
1302             len = strlen(rmsbuf);
1303             if (len>0)
1304             {
1305                 rmsbuf[len]=',';
1306                 rmsbuf[len+1]=',';
1307                 len++;
1308             }
1309             sprintf(rmsbuf+len, "%d", i+1);
1310         }
1311     }
1312      *
1313      */
1314     
1315     if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_J, "%hd:%hd:%hd:%ld:%s", 
1316             p_tl->tmrmid, p_tl->tmnodeid, p_tl->tmsrvid, p_tl->txtout, rmsbuf))
1317     {
1318         ret=EXFAIL;
1319         goto out;
1320     }
1321     
1322 out:
1323     return ret;
1324 }
1325 /* Tokenizer variables */
1326 #define TOKEN_READ_VARS \
1327             int first = EXTRUE;\
1328             char *p;
1329 
1330 /* Read token for parsing */
1331 #define TOKEN_READ(X, Y)\
1332     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1333     {\
1334         NDRX_LOG(log_error, "Missing token: %s.%s!", X, Y);\
1335         EXFAIL_OUT(ret);\
1336     }\
1337     if (first)\
1338         first = EXFALSE;\
1339         
1340 #define TOKEN_READ_OPT(X, Y)\
1341     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1342     {\
1343         NDRX_LOG(log_warn, "Missing token: %s.%s - optional, ignore!", X, Y);\
1344     }\
1345     if (first)\
1346         first = EXFALSE;\
1347 
1348 /**
1349  * Parse & rebuild the info from file record
1350  * @param buf - record from transaction file
1351  * @param p_tl - transaction record
1352  * @return SUCCEED/FAIL
1353  */
1354 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl)
1355 {
1356     int ret = EXSUCCEED;
1357     TOKEN_READ_VARS;
1358     char *p2;
1359     char *saveptr1 = NULL;
1360     int rmid;
1361     long long tdiff;
1362     
1363     /* Read first time stamp */
1364     TOKEN_READ("info", "tstamp");
1365     p_tl->t_start = strtoull (p, NULL, 10);
1366     
1367     tdiff = ndrx_utc_tstamp() - p_tl->t_start;
1368     /* adjust the timer...  */
1369     ndrx_stopwatch_reset(&p_tl->ttimer);
1370     ndrx_stopwatch_minus(&p_tl->ttimer, tdiff);
1371     NDRX_LOG(log_debug, "Transaction age: %ld ms (t_start: %llu tdiff: %lld)",
1372         ndrx_stopwatch_get_delta(&p_tl->ttimer), p_tl->t_start, tdiff);
1373              
1374     /* read tmrmid, tmnodeid, tmsrvid = ignore, but they must be in place! */
1375     TOKEN_READ("info", "cmdid");
1376     /* Basically we ignore these values, and read from current process!!!: */
1377     TOKEN_READ("info", "tmrmid");
1378     TOKEN_READ("info", "tmnodeid");
1379     TOKEN_READ("info", "tmsrvid");
1380     p_tl->tmrmid = G_atmi_env.xa_rmid;
1381     p_tl->tmnodeid = G_tmsrv_cfg.vnodeid;
1382     p_tl->tmsrvid = G_server_conf.srv_id;
1383     
1384     TOKEN_READ("info", "txtout");
1385     p_tl->txtout = atol(p);
1386     
1387     /* Read involved resrouce managers - put them in join state */
1388 #if 0
1389     TOKEN_READ("info", "rmsbuf");
1390     
1391     - not used anymore. All active branches are logged as RM status
1392     p2 = strtok_r (p, ",", &saveptr1);
1393     while (p2 != NULL)
1394     {
1395         /* Put the p2 in join state */
1396         rmid = atoi(p2);
1397         if (rmid<1 || rmid>NDRX_MAX_RMS)
1398         {
1399             NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1400             EXFAIL_OUT(ret);
1401         }
1402         
1403         p_tl->rmstatus[rmid-1].rmstatus =XA_RM_STATUS_ACTIVE;
1404         
1405         p2 = strtok_r (NULL, ",", &saveptr1);
1406     }
1407 #endif
1408     
1409 out:                
1410     return ret;
1411 }
1412 
1413 #define CRASH_CLASS_EXIT            0 /**< exit at crash                  */
1414 #define CRASH_CLASS_NO_WRITE        1 /**< Do not write and report error  */
1415 
1416 /**
1417  * Change tx state + write transaction stage
1418  * FORMAT: <STAGE_CODE>
1419  * @param p_tl
1420  * @param forced is decision forced? I.e. no restore on error.
1421  * @return EXSUCCEED/EXFAIL
1422  */
1423 expublic int tms_log_stage(atmi_xa_log_t *p_tl, short stage, int forced)
1424 {
1425     int ret = EXSUCCEED;
1426     short stage_org=EXFAIL;
1427     /* <Crash testing> */
1428     int make_crash = EXFALSE; /**< Crash simulation */
1429     int crash_stage, crash_class;
1430     /* </Crash testing> */
1431     
1432     CHK_THREAD_ACCESS;
1433     
1434     if (p_tl->txstage!=stage)
1435     {
1436         stage_org = p_tl->txstage;
1437         p_tl->txstage = stage;
1438 
1439         NDRX_LOG(log_debug, "tms_log_stage: new stage - %hd (cc %d)", 
1440                 p_tl->txstage, G_atmi_env.test_tmsrv_commit_crash);
1441         
1442         
1443         /* <Crash testing> */
1444         
1445         /* QA: commit crash test point... 
1446          * Once crash flag is disabled, commit shall be finished by background
1447          * process.
1448          */
1449         crash_stage = G_atmi_env.test_tmsrv_commit_crash % 100;
1450         crash_class = G_atmi_env.test_tmsrv_commit_crash / 100;
1451         
1452         /* let write && exit */
1453         if (stage > 0 && crash_class==CRASH_CLASS_EXIT && stage == crash_stage)
1454         {
1455             NDRX_LOG(log_debug, "QA commit crash...");
1456             G_atmi_env.test_tmsrv_write_fail=EXTRUE;
1457             make_crash=EXTRUE;
1458         }
1459 
1460         /* no write & report error */
1461         if (stage > 0 && crash_class==CRASH_CLASS_NO_WRITE && stage == crash_stage)
1462         {
1463             NDRX_LOG(log_debug, "QA no write crash");
1464             ret=EXFAIL;
1465             goto out;
1466         }
1467         /* </Crash testing> */
1468         else
1469         {
1470             /* in case if ... there was failover and concurrent write of commit
1471              * which might overwrite the given abort position, lets write abort
1472              * states few times more, so that recovery processes would finally
1473              * see that abort has started...
1474              * Note that other tmsrv will not go further than last commit entry
1475              * as after that there is check is that tmsrv still on locked node.
1476              */
1477             int w_cnt=1, i;
1478 
1479             if (XA_TX_STAGE_ABORTING==stage)
1480             {
1481                 w_cnt=3;
1482             }
1483             else if (XA_TX_STAGE_COMMITTING==stage)
1484             {
1485                 /* ensure that we are still locked, as we are going to write
1486                  * commit message down. If the abort was in progress,
1487                  * then our previous prepares might have overwritten  their
1488                  * abort state. Thus if we are ok here, then there will be
1489                  * 1x commit at given position, but if after then or before
1490                  * the actual disk write take over, they will put 3x aborts
1491                  * and then recovery process shall pick that up
1492                  */
1493                 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1494                 {
1495                     EXFAIL_OUT(ret);
1496                 }
1497             }
1498 
1499             for (i=0; i<w_cnt; i++)
1500             {
1501                 if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_STAGE, "%hd", stage))
1502                 {
1503                     ret=EXFAIL;
1504                     goto out;
1505                 }
1506             }
1507         }
1508 
1509         /* in case if switching to committing, we must sync the log & directory */
1510         if ((XA_TX_STAGE_COMMITTING==stage) || (XA_TX_STAGE_ABORTING==stage))
1511         {
1512             if (EXSUCCEED!=ndrx_fsync_fsync(p_tl->f, G_atmi_env.xa_fsync_flags) ||
1513                 EXSUCCEED!=ndrx_fsync_dsync(G_tmsrv_cfg.tlog_dir, G_atmi_env.xa_fsync_flags))
1514             {
1515                 EXFAIL_OUT(ret);
1516             }
1517 
1518             /* if we are in singleton group mode, validate that we still 
1519              * own the lock
1520              */
1521             if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1522             {
1523                 EXFAIL_OUT(ret);
1524             }
1525 
1526         } /* new stage commit or abort */
1527     }
1528     
1529 out:
1530                 
1531     /* <Crash testing> */
1532     if (make_crash)
1533     {
1534         exit(1);
1535     }
1536     /* </Crash testing> */
1537 
1538     /* If failed to log the stage switch, restore original transaction
1539      * stage
1540      */
1541 
1542     if (forced)
1543     {
1544         return EXSUCCEED;
1545     }
1546     else if (EXSUCCEED!=ret && EXFAIL!=stage_org)
1547     {
1548         p_tl->txstage = stage_org;
1549     }
1550 
1551     /* if not forced, get the real result */
1552     return ret;
1553 }
1554 
1555 
1556 /**
1557  * Parse stage info from buffer read
1558  * @param buf - buffer read
1559  * @param p_tl - transaction record
1560  * @return SUCCEED/FAIL
1561  */
1562 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl)
1563 {
1564     int ret = EXSUCCEED;
1565     short stage;
1566     TOKEN_READ_VARS;
1567 
1568     TOKEN_READ("stage", "tstamp");
1569     p_tl->t_update = atol(p);
1570     TOKEN_READ("info", "cmdid");
1571 
1572     TOKEN_READ("stage", "txstage");
1573 
1574     /* In case ... if we are committing, there will be no abort.
1575     * and vice versa. that's in case if doing failover
1576     * the first state shall live on...
1577     * --- not true...
1578     * we could go to commit, but just right before logging
1579     * other node took over... starts to abort.
1580     * and first one wakes up and tries to write the commit stage...
1581     * thus we shall allow to go down from commit to abort.
1582     */
1583     stage=(short)atoi(p);
1584 
1585     if (stage <= p_tl->txstage)
1586     {
1587         NDRX_LOG(log_error, "Stage for [%s] downgrade was %hd, read %hd",
1588                 p_tl->tmxid, p_tl->txstage, stage);
1589         userlog("Stage for [%s] downgrade was %hd, read %hd",
1590                 p_tl->tmxid, p_tl->txstage, stage);
1591     }
1592 
1593     if (p_tl->txstage>=XA_TX_STAGE_ABORTING &&
1594         p_tl->txstage<=XA_TX_STAGE_ABFORGOT_HEU)
1595     {
1596         if (stage>=XA_TX_STAGE_ABORTING &&
1597             stage<=XA_TX_STAGE_ABFORGOT_HEU)
1598         {
1599             p_tl->txstage = stage;
1600         }
1601         else
1602         {
1603             NDRX_LOG(log_error, "Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1604                 p_tl->tmxid, p_tl->txstage, stage);
1605             userlog("Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1606                 p_tl->tmxid, p_tl->txstage, stage);
1607         }
1608     }
1609 #if 0
1610     else if (p_tl->txstage>=XA_TX_STAGE_COMMITTING &&
1611         p_tl->txstage<=XA_TX_STAGE_COMFORGOT_HEU)
1612     {
1613 
1614         /* if there was commit, we allow abort too.. */
1615         if (stage>=XA_TX_STAGE_COMMITTING &&
1616             stage<=XA_TX_STAGE_COMFORGOT_HEU)
1617         {
1618             p_tl->txstage = stage;
1619         }
1620         else
1621         {
1622             NDRX_LOG(log_error, "Invalid stage for [%s] was %hd (commit range) read %hd - IGNORE",
1623                 p_tl->tmxid, p_tl->txstage, stage);
1624             userlog("Invalid stage for [%s] was %hd (commit range) read %hd - IGNORE",
1625                 p_tl->tmxid, p_tl->txstage, stage);
1626         }
1627     }
1628 #endif
1629     else
1630     {
1631         p_tl->txstage = stage;
1632     }
1633    
1634 out:
1635     return ret;
1636 }
1637 
1638 /**
1639  * Change + write RM status to file
1640  * FORMAT: <rmid>:<rmstatus>:<rmerrorcode>:<rmreason>
1641  * @param p_tl transaction
1642  * @param[in] bt transaction branch
1643  * @param[in] rmstatus Resource manager status
1644  * @param[in] rmerrorcode Resource manager error code
1645  * @param[in] rmreason reason code
1646  * @return SUCCEED/FAIL
1647  */
1648 expublic int tms_log_rmstatus(atmi_xa_log_t *p_tl, atmi_xa_rm_status_btid_t *bt, 
1649         char rmstatus, int rmerrorcode, short rmreason)
1650 {
1651     int ret = EXSUCCEED;
1652     int do_log = EXFALSE;
1653     
1654     CHK_THREAD_ACCESS;
1655 
1656     if (bt->rmstatus != rmstatus)
1657     {
1658         do_log = EXTRUE;
1659     }
1660     
1661     bt->rmstatus = rmstatus;
1662     bt->rmerrorcode = rmerrorcode;
1663     bt->rmreason = rmreason;
1664     
1665     if (do_log)
1666     {
1667         if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_RMSTAT, "%hd:%c:%d:%hd:%ld",
1668                 bt->rmid, rmstatus, rmerrorcode, rmreason, bt->btid))
1669         {
1670             ret=EXFAIL;
1671             goto out;
1672         }
1673     }
1674     
1675 out:
1676     return ret;
1677 }
1678 
1679 /**
1680  * Parse RM's status record from transaction file
1681  * @param buf - transaction record's line
1682  * @param p_tl - internal transaction record
1683  * @return SUCCEED/FAIL;
1684  */
1685 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl)
1686 {
1687     int ret = EXSUCCEED;
1688     char rmstatus;
1689     int rmerrorcode;
1690     short rmreason;
1691     int rmid;
1692     long btid;
1693     atmi_xa_rm_status_btid_t *bt = NULL;
1694     
1695     TOKEN_READ_VARS;
1696    
1697     TOKEN_READ("rmstat", "tstamp");
1698     p_tl->t_update = atol(p);
1699     TOKEN_READ("info", "cmdid");
1700     
1701     TOKEN_READ("rmstat", "rmid");
1702    
1703     rmid = atoi(p);
1704     if (rmid<1 || rmid>NDRX_MAX_RMS)
1705     {
1706         NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1707         EXFAIL_OUT(ret);
1708     }
1709    
1710     TOKEN_READ("rmstat", "rmstatus");
1711     rmstatus = *p;
1712 
1713     TOKEN_READ("rmstat", "rmerrorcode");
1714     rmerrorcode = atoi(p);
1715 
1716     TOKEN_READ("rmstat", "rmreason");
1717     rmreason = (short)atoi(p);
1718 
1719     TOKEN_READ_OPT("rmstat", "btid");
1720     
1721     if (NULL!=p)
1722     {
1723         btid = atol(p);
1724     }
1725     else
1726     {
1727         btid=0;
1728     }
1729     
1730     /*
1731     p_tl->rmstatus[rmid-1].rmstatus = rmstatus;
1732     p_tl->rmstatus[rmid-1].rmerrorcode = rmerrorcode;
1733     p_tl->rmstatus[rmid-1].rmreason = rmreason;
1734      * */
1735     
1736     ret = tms_btid_addupd(p_tl, rmid, 
1737             &btid, rmstatus, rmerrorcode, rmreason, NULL, &bt);
1738    
1739 out:
1740     return ret;    
1741 }
1742 
1743 /**
1744  * Copy the background items to the linked list.
1745  * The idea is that this is processed by background. During that time, it does not
1746  * remove any items from hash. Thus pointers should be still valid. 
1747  * TODO: We should copy here transaction info too....
1748  * 
1749  * @param p_tl
1750  * @return 
1751  */
1752 expublic atmi_xa_log_list_t* tms_copy_hash2list(int copy_mode)
1753 {
1754     atmi_xa_log_list_t *ret = NULL;
1755     atmi_xa_log_t * r, *rt;
1756     atmi_xa_log_list_t *tmp;
1757     
1758     if (copy_mode & COPY_MODE_ACQLOCK)
1759     {
1760         MUTEX_LOCK_V(M_tx_hash_lock);
1761     }
1762     
1763     /* No changes to hash list during the lock. */    
1764     
1765     EXHASH_ITER(hh, M_tx_hash, r, rt)
1766     {
1767         /* Only background items... */
1768         if (r->is_background && !(copy_mode & COPY_MODE_BACKGROUND))
1769             continue;
1770         
1771         if (!r->is_background && !(copy_mode & COPY_MODE_FOREGROUND))
1772             continue;
1773                 
1774         if (NULL==(tmp = NDRX_CALLOC(1, sizeof(atmi_xa_log_list_t))))
1775         {
1776             NDRX_LOG(log_error, "Failed to malloc %d: %s", 
1777                     sizeof(atmi_xa_log_list_t), strerror(errno));
1778             goto out;
1779         }
1780         
1781         /* we should copy full TL structure, because some other thread might
1782          * will use it.
1783          * Having some invalid pointers inside does not worry us, because
1784          * we just need a list for a printing or xids for background txn lookup
1785          */
1786         memcpy(&tmp->p_tl, r, sizeof(*r));
1787         
1788         LL_APPEND(ret, tmp);
1789     }
1790     
1791 out:
1792     if (copy_mode & COPY_MODE_ACQLOCK)
1793     {
1794         MUTEX_UNLOCK_V(M_tx_hash_lock);
1795     }
1796 
1797     return ret;
1798 }
1799 
1800 /**
1801  * Lock the transaction log hash
1802  */
1803 expublic void tms_tx_hash_lock(void)
1804 {
1805     MUTEX_LOCK_V(M_tx_hash_lock);
1806 }
1807 
1808 /**
1809  * Unlock the transaction log hash
1810  */
1811 expublic void tms_tx_hash_unlock(void)
1812 {
1813     MUTEX_UNLOCK_V(M_tx_hash_lock);
1814 }
1815 
1816 /**
1817  * Copy TLOG info to FB (for display purposes)
1818  * The function has mandatory task to fill in the general fields about
1819  * transaction.
1820  * The actual RM/Branch tids are fill up till we get UBF buffer error (full)
1821  * @param p_ub - Dest UBF
1822  * @param p_tl - Transaction log
1823  * @param incl_rm_stat include RM status in response buffer
1824  * @return EXSUCCEED/EXFAIL
1825  */
1826 expublic int tms_log_cpy_info_to_fb(UBFH *p_ub, atmi_xa_log_t *p_tl, int incl_rm_stat)
1827 {
1828     int ret = EXSUCCEED;
1829     long tspent;
1830     short i;
1831     tspent = p_tl->txtout - ndrx_stopwatch_get_delta_sec(&p_tl->ttimer);    
1832     atmi_xa_rm_status_btid_t *el, *elt;
1833     
1834     if (tspent<0)
1835     {
1836         tspent = 0;
1837     }
1838     
1839     if (
1840             EXSUCCEED!=Bchg(p_ub, TMXID, 0, (char *)p_tl->tmxid, 0L) ||
1841             EXSUCCEED!=Bchg(p_ub, TMRMID, 0, (char *)&(p_tl->tmrmid), 0L) ||
1842             EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&(p_tl->tmnodeid), 0L) ||
1843             EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&(p_tl->tmsrvid), 0L) ||
1844             EXSUCCEED!=Bchg(p_ub, TMTXTOUT, 0, (char *)&(p_tl->txtout), 0L) ||
1845             EXSUCCEED!=Bchg(p_ub, TMTXTOUT_LEFT, 0, (char *)&tspent, 0L) ||
1846             EXSUCCEED!=Bchg(p_ub, TMTXSTAGE, 0, (char *)&(p_tl->txstage), 0L) ||
1847             EXSUCCEED!=Bchg(p_ub, TMTXTRYCNT, 0, (char *)&(p_tl->trycount), 0L) ||
1848             EXSUCCEED!=Bchg(p_ub, TMTXTRYMAXCNT, 0, (char *)&(G_tmsrv_cfg.max_tries), 0L)
1849         )
1850     {
1851         NDRX_LOG(log_error, "Failed to return fields: [%s]", 
1852                 Bstrerror(Berror));
1853         EXFAIL_OUT(ret);
1854     }
1855     
1856     /* return the info about RMs: */
1857     for (i=0; incl_rm_stat && i<NDRX_MAX_RMS; i++)
1858     {
1859         /* loop over the Branch TIDs  */
1860         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1861         {
1862             /* cast to long... */
1863             long rmerrorcode =  (long)el->rmerrorcode;
1864             
1865             if (
1866                 EXSUCCEED!=Badd(p_ub, TMTXRMID, (char *)&el->rmid, 0L) ||
1867                 EXSUCCEED!=Badd(p_ub, TMTXBTID, (char *)&el->btid, 0L) ||
1868                 EXSUCCEED!=Badd(p_ub, TMTXRMSTATUS,
1869                     (char *)&(el->rmstatus), 0L) ||
1870                 EXSUCCEED!=Badd(p_ub, TMTXRMERRCODE,
1871                     (char *)&rmerrorcode, 0L) ||
1872                 EXSUCCEED!=Badd(p_ub, TMTXRMREASON,
1873                     (char *)&(el->rmreason), 0L)
1874                 )
1875             {
1876                 /* there could be tons of these branches for 
1877                  * mysql/mariadb or posgresql 
1878                  */
1879                 NDRX_LOG(log_error, "Failed to return fields: [%s] - ignore", 
1880                             Bstrerror(Berror));
1881                 
1882                 /* finish off. */
1883                 goto out;
1884             }
1885         }
1886     } /* for */
1887     
1888 out:
1889     return ret;
1890 }
1891 
1892 /* vim: set ts=4 sw=4 et smartindent: */