Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief tmsrv - transaction logging & accounting
0003  *   for systems which are not support TMJOIN, we shall create additional
0004  *   XIDs for each of the involved process session. These XIDs shall be used
0005  *   as "sub-xids". There will be Master XID involved in process and
0006  *   sub-xids will be logged in tmsrv and will be known by processes locally
0007  *   Thus p_tl->rmstatus needs to be extended with hash list of the local
0008  *   transaction ids. We shall keep the structure universal, and use
0009  *   sub-xids even TMJOIN is supported.
0010  *
0011  * @file log.c
0012  */
0013 /* -----------------------------------------------------------------------------
0014  * Enduro/X Middleware Platform for Distributed Transaction Processing
0015  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0016  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0017  * This software is released under one of the following licenses:
0018  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0019  * See LICENSE file for full text.
0020  * -----------------------------------------------------------------------------
0021  * AGPL license:
0022  *
0023  * This program is free software; you can redistribute it and/or modify it under
0024  * the terms of the GNU Affero General Public License, version 3 as published
0025  * by the Free Software Foundation;
0026  *
0027  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0028  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0029  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0030  * for more details.
0031  *
0032  * You should have received a copy of the GNU Affero General Public License along 
0033  * with this program; if not, write to the Free Software Foundation, Inc.,
0034  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0035  *
0036  * -----------------------------------------------------------------------------
0037  * A commercial use license is available from Mavimax, Ltd
0038  * contact@mavimax.com
0039  * -----------------------------------------------------------------------------
0040  */
0041 #include <stdio.h>
0042 #include <stdlib.h>
0043 #include <string.h>
0044 #include <errno.h>
0045 #include <regex.h>
0046 #include <utlist.h>
0047 #include <stdarg.h>
0048 
0049 #include <ndebug.h>
0050 #include <atmi.h>
0051 #include <atmi_int.h>
0052 #include <typed_buf.h>
0053 #include <ndrstandard.h>
0054 #include <ubf.h>
0055 #include <Exfields.h>
0056 #include <nstdutil.h>
0057 
0058 #include <exnet.h>
0059 #include <ndrxdcmn.h>
0060 
0061 #include "tmsrv.h"
0062 #include "../libatmisrv/srv_int.h"
0063 #include "userlog.h"
0064 #include <xa_cmn.h>
0065 #include <exhash.h>
0066 #include <unistd.h>
0067 #include <Exfields.h>
0068 #include <singlegrp.h>
0069 #include <sys_test.h>
0070 /*---------------------------Externs------------------------------------*/
0071 /*---------------------------Macros-------------------------------------*/
0072 #define LOG_MAX         1024
0073 
0074 #define LOG_COMMAND_STAGE           'S' /**< Identify stage of txn           */
0075 #define LOG_COMMAND_I               'I' /**< Info about txn                  */
0076 #define LOG_COMMAND_J               'J' /**< Version 2 format with checksums */
0077 #define LOG_COMMAND_RMSTAT          'R' /**< Log the RM status               */
0078 
0079 #define LOG_VERSION_1                1   /**< Initial version                  */
0080 #define LOG_VERSION_2                2   /**< Version 1, contains crc32 checks */
0081 
0082 #define CHK_THREAD_ACCESS if (ndrx_gettid()!=p_tl->lockthreadid)\
0083     {\
0084         NDRX_LOG(log_error, "Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0085                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0086         userlog("Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0087                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0088         return EXFAIL;\
0089     }
0090 #define LOG_RS_SEP  ';'            /**< record seperator               */
0091 /*---------------------------Enums--------------------------------------*/
0092 /*---------------------------Typedefs-----------------------------------*/
0093 /*---------------------------Globals------------------------------------*/
0094 exprivate atmi_xa_log_t *M_tx_hash = NULL; 
0095 exprivate MUTEX_LOCKDECL(M_tx_hash_lock); /* Operations with hash list            */
0096 /*---------------------------Statics------------------------------------*/
0097 /*---------------------------Prototypes---------------------------------*/
0098 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...);
0099 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl);
0100 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl);
0101 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl);
0102 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid);
0103 
0104 /**
0105  * Unlock transaction
0106  * @param p_tl
0107  * @return SUCCEED/FAIL
0108  */
0109 expublic int tms_unlock_entry(atmi_xa_log_t *p_tl)
0110 {
0111     CHK_THREAD_ACCESS;
0112     
0113     NDRX_LOG(log_info, "Transaction [%s] unlocked by thread %" PRIu64, p_tl->tmxid,
0114             p_tl->lockthreadid);
0115     
0116     MUTEX_LOCK_V(M_tx_hash_lock);
0117     p_tl->lockthreadid = 0;
0118     MUTEX_UNLOCK_V(M_tx_hash_lock);
0119     
0120     return EXSUCCEED;
0121 }
0122 
0123 /**
0124  * Check that log file actually exists on the disk.
0125  * @param tmxid transaction
0126  * @return EXTRUE/EXFALSE/EXFAIL
0127  */
0128 expublic int tms_log_exists_file(char *tmxid)
0129 {
0130     char fname[PATH_MAX+1];
0131     int err=0;
0132 
0133     tms_gen_file_name(fname, sizeof(fname), tmxid);
0134 
0135     if (EXSUCCEED == access(fname, 0))
0136     {
0137         return EXTRUE;
0138     }
0139 
0140     err=errno;
0141 
0142     if (ENOENT==err)
0143     {
0144         return EXFALSE;
0145     }
0146 
0147     NDRX_LOG(log_error, "Failed to check file [%s] presence: %s",
0148         fname, strerror(err));
0149     userlog("Failed to check file [%s] presence: %s",
0150         fname, strerror(err));
0151 
0152     return EXFAIL;
0153 }
0154 
0155 /**
0156  * Check that memory based log entry exists.
0157  * @param tmxid - serialized XID
0158  * @return NULL or log entry
0159  */
0160 expublic int tms_log_exists_entry(char *tmxid)
0161 {
0162     atmi_xa_log_t *r = NULL;
0163     
0164     MUTEX_LOCK_V(M_tx_hash_lock);
0165     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0166     MUTEX_UNLOCK_V(M_tx_hash_lock);
0167     
0168     if (NULL!=r)
0169     {
0170         return EXTRUE;
0171     }
0172     else
0173     {
0174         return EXFALSE;
0175     }
0176 }
0177 
0178 /**
0179  * Get the log entry of the transaction
0180  * Now we should lock it for thread.
0181  * TODO: Add option for wait on lock.
0182  * This would be needed for cases for example when Postgres may report
0183  * the status in background, and for some reason TMSRV already processes the
0184  * transaction (ether abort, or new register branch, etc...) and some stalled
0185  * PG process wants to finish the work off. Thus we need to waited lock for
0186  * foreground operations.
0187  * TODO: In future release lock could be based on real mutex within the
0188  * transaction object, so that we do not have to employ the sleep
0189  * @param tmxid - serialized XID
0190  * @param[in] dowait milliseconds to wait for lock, before give up
0191  * @param[out] locke lock error
0192  * @return NULL or log entry
0193  */
0194 expublic atmi_xa_log_t * tms_log_get_entry(char *tmxid, int dowait, int *locke)
0195 {
0196     atmi_xa_log_t *r = NULL;
0197     ndrx_stopwatch_t w;
0198     
0199     if (dowait)
0200     {
0201         ndrx_stopwatch_reset(&w);
0202     }
0203     
0204     if (NULL!=locke)
0205     {
0206         *locke=EXFALSE;
0207     }
0208     
0209 restart:
0210     MUTEX_LOCK_V(M_tx_hash_lock);
0211     EXHASH_FIND_STR( M_tx_hash, tmxid, r);
0212     
0213     if (NULL!=r)
0214     {
0215         if (r->lockthreadid)
0216         {
0217             if (dowait && ndrx_stopwatch_get_delta(&w) < dowait)
0218             {
0219                 MUTEX_UNLOCK_V(M_tx_hash_lock);
0220                 /* sleep 100 msec */
0221                 usleep(100000);
0222                 goto restart;
0223                 
0224             }
0225             
0226             NDRX_LOG(log_error, "Transaction [%s] already locked for thread_id: %"
0227                     PRIu64 " lock time: %d msec",
0228                     tmxid, r->lockthreadid, dowait);
0229             
0230             userlog("tmsrv: Transaction [%s] already locked for thread_id: %" PRIu64
0231                     "lock time: %d msec",
0232                     tmxid, r->lockthreadid, dowait);
0233             r = NULL;
0234             
0235             /* cannot get lock */
0236             if (NULL!=locke)
0237             {
0238                 *locke=EXTRUE;
0239             }
0240             
0241         }
0242         else
0243         {
0244             r->lockthreadid = ndrx_gettid();
0245             NDRX_LOG(log_info, "Transaction [%s] locked for thread_id: %" PRIu64,
0246                     tmxid, r->lockthreadid);
0247         }
0248     }
0249     
0250     MUTEX_UNLOCK_V(M_tx_hash_lock);
0251     
0252     return r;
0253 }
0254 
0255 /**
0256  * set current lock sequence number
0257  * @param p_tl log entry
0258  */
0259 exprivate int tms_log_setseq(atmi_xa_log_t *p_tl)
0260 {
0261     int ret= EXSUCCEED;
0262     long grp_flags=0;
0263     /* if we are in singleton group mode, validate that we still
0264      * own the lock
0265      */
0266     if (G_atmi_env.procgrp_no)
0267     {
0268         p_tl->sg_sequence=tpsgislocked(G_atmi_env.procgrp_no
0269             , TPPG_SGVERIFY|TPPG_NONSGSUCC
0270             , &grp_flags);
0271 
0272         if (EXFAIL==p_tl->sg_sequence)
0273         {
0274             NDRX_LOG(log_error, "tpsgislocked failed %s", tpstrerror(tperrno));
0275             EXFAIL_OUT(ret);
0276         }
0277         
0278         if (grp_flags & TPPG_SINGLETON && p_tl->sg_sequence<=0)
0279         {
0280             NDRX_LOG(log_error, "Singleton group %d lock lost (at start) - exit(-1)",
0281                 G_atmi_env.procgrp_no);
0282             userlog("Singleton group %d lock lost (at start) - exit(-1)",
0283                 G_atmi_env.procgrp_no);
0284             /* !!!! */
0285             exit(EXFAIL);
0286         }
0287     }
0288 
0289 out:
0290     return ret;
0291 }
0292 
0293 /**
0294  * run checkpoint on transaction
0295  * @param p_tl transaction log
0296  */
0297 expublic int tms_log_checkpointseq(atmi_xa_log_t *p_tl)
0298 {
0299     int ret=EXSUCCEED;
0300     long seq;
0301     long grp_flags=0;
0302     /* if we are in singleton group mode, validate that we still
0303      * own the lock
0304      */
0305     if (G_atmi_env.procgrp_no)
0306     {
0307         seq=tpsgislocked(G_atmi_env.procgrp_no, TPPG_SGVERIFY|TPPG_NONSGSUCC, &grp_flags);
0308 
0309         if (seq < 0)
0310         {
0311             NDRX_LOG(log_error, "tpsgislocked returns %s", tpstrerror(tperrno));
0312             EXFAIL_OUT(ret);
0313         }
0314 
0315         if ((grp_flags & TPPG_SINGLETON) && 0==seq)
0316         {
0317             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost - exit(-1)",
0318                 G_atmi_env.procgrp_no, tpgetnodeid());
0319             userlog("Singleton group %d on node %ld lock lost - exit(-1)",
0320                 G_atmi_env.procgrp_no, tpgetnodeid());
0321             /* !!!! */
0322             exit(EXFAIL);
0323         }
0324 
0325         /* allow simple lock grou pbased checks */
0326         if (NULL!=p_tl)
0327         {
0328             /* failover has happened during transaction processing
0329             * thus cannot proceed with decsion, only after restart
0330             */
0331             if ( (grp_flags & TPPG_SINGLETON) && (seq - p_tl->sg_sequence >= G_atmi_env.sglockinc))
0332             {
0333                 NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0334                     G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0335                 userlog("Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0336                     G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0337                 /* !!!! */
0338                 exit(EXFAIL);
0339             }
0340 
0341             /* we are safe to continue... */
0342             p_tl->sg_sequence=seq;
0343         }
0344     }
0345 out:
0346     return ret;
0347 }
0348 
0349 /**
0350  * Log transaction as started
0351  * @param xai - XA Info struct
0352  * @param txtout - transaction timeout
0353  * @param[out] btid branch transaction id (if not dynamic reg)
0354  * @return SUCCEED/FAIL
0355  */
0356 expublic int tms_log_start(atmi_xa_tx_info_t *xai, int txtout, long tmflags,
0357         long *btid)
0358 {
0359     int ret = EXSUCCEED;
0360     int hash_added = EXFALSE;
0361     atmi_xa_log_t *tmp = NULL;
0362     atmi_xa_rm_status_btid_t *bt;
0363     /* 1. Add stuff to hash list */
0364     if (NULL==(tmp = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0365     {
0366         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0367         ret=EXFAIL;
0368         goto out;
0369     }
0370     tmp->txstage = XA_TX_STAGE_ACTIVE;
0371     
0372     tmp->tmnodeid = xai->tmnodeid;
0373     tmp->tmsrvid = xai->tmsrvid;
0374     tmp->tmrmid = xai->tmrmid;
0375     NDRX_STRCPY_SAFE(tmp->tmxid, xai->tmxid);
0376     
0377     tmp->t_start = ndrx_utc_tstamp();
0378     tmp->t_update = ndrx_utc_tstamp();
0379     tmp->txtout = txtout;
0380     tmp->log_version = LOG_VERSION_2;   /**< Now with CRC32 groups */
0381     ndrx_stopwatch_reset(&tmp->ttimer);
0382 
0383     if (EXSUCCEED!=tms_log_setseq(tmp))
0384     {
0385         EXFAIL_OUT(ret);
0386     }
0387 
0388     /* lock for us, yet it is not shared*/
0389     tmp->lockthreadid = ndrx_gettid();
0390 
0391     /* well... I guess firstly we shall
0392      * add transaction to hashmap, and only then open the file
0393      * otherwise we can for file, they are found and then
0394      * we request to remove the files...
0395      */
0396     MUTEX_LOCK_V(M_tx_hash_lock);
0397     EXHASH_ADD_STR( M_tx_hash, tmxid, tmp);
0398     MUTEX_UNLOCK_V(M_tx_hash_lock);
0399     hash_added = EXTRUE;
0400     
0401     /* TODO: Open the log file & write tms_close_logfile
0402      * Only question, how long 
0403      */
0404     
0405     /* Open log file */
0406     if (EXSUCCEED!=tms_open_logfile(tmp, "a"))
0407     {
0408         NDRX_LOG(log_error, "Failed to create transaction log file");
0409         userlog("Failed to create transaction log file");
0410         EXFAIL_OUT(ret);
0411     }
0412     
0413     /* log the opening infos */
0414     if (EXSUCCEED!=tms_log_info(tmp))
0415     {
0416         NDRX_LOG(log_error, "Failed to log tran info");
0417         userlog("Failed to log tran info");
0418         EXFAIL_OUT(ret);
0419     }
0420     
0421     /* Only for static... */
0422     if (!(tmflags & TMFLAGS_DYNAMIC_REG))
0423     {
0424         /* assign btid? 
0425         tmp->rmstatus[xai->tmrmid-1].rmstatus = XA_RM_STATUS_ACTIVE;
0426         */
0427         char start_stat = XA_RM_STATUS_ACTIVE;
0428         
0429         *btid = tms_btid_gettid(tmp, xai->tmrmid);
0430 
0431         /* Just get the TID, status will be changed with file update  */
0432         if (EXSUCCEED!=tms_btid_add(tmp, xai->tmrmid, *btid, XA_RM_STATUS_NULL, 
0433                 0, 0, &bt))
0434         {
0435             NDRX_LOG(log_error, "Failed to add BTID: %ld", *btid);
0436             EXFAIL_OUT(ret);
0437         }
0438         
0439         if (tmflags & TMFLAGS_TPNOSTARTXID)
0440         {
0441             NDRX_LOG(log_info, "TPNOSTARTXID => starting as %c - prepared", 
0442                     XA_RM_STATUS_PREP);
0443             start_stat = XA_RM_STATUS_UNKOWN;
0444         }
0445         
0446         /* log to transaction file -> Write off RM status */
0447         if (EXSUCCEED!=tms_log_rmstatus(tmp, bt, start_stat, 0, 0))
0448         {
0449             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0450             EXFAIL_OUT(ret);
0451         }
0452     }
0453     
0454 out:
0455     
0456     /* unlock */
0457     if (EXSUCCEED!=ret)
0458     {
0459         tms_remove_logfile(tmp, hash_added);
0460     }
0461     else
0462     {
0463         if (NULL!=tmp)
0464         {
0465             tms_unlock_entry(tmp);
0466         }
0467     }
0468     return ret;
0469 }
0470 
0471 /**
0472  * Add RM to transaction.
0473  * If tid is known, then check and/or add.
0474  * If add, then ensure that we step up the BTID counter, if the number is
0475  * higher than counter have.
0476  * @param xai
0477  * @param rmid - 1 based rmid
0478  * @param[in,out] btid branch tid (if it is known, i.e. 0), if not known, then -1
0479  *  for unknown BTID requests, new BTID is generated
0480  * @return EXSUCCEED/EXFAIL
0481  */
0482 expublic int tms_log_addrm(atmi_xa_tx_info_t *xai, short rmid, int *p_is_already_logged,
0483         long *btid, long flags)
0484 {
0485     int ret = EXSUCCEED;
0486     atmi_xa_log_t *p_tl= NULL;
0487     atmi_xa_rm_status_btid_t *bt = NULL;
0488     
0489     /* atmi_xa_rm_status_t stat; */
0490     /*
0491     memset(&stat, 0, sizeof(stat));
0492     stat.rmstatus = XA_RM_STATUS_ACTIVE;
0493      * 
0494     */
0495     
0496     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, NULL)))
0497     {
0498         NDRX_LOG(log_error, "No transaction/lock timeout under xid_str: [%s]", 
0499                 xai->tmxid);
0500         ret=EXFAIL;
0501         goto out_nolock;
0502     }
0503     
0504     if (1 > rmid || rmid>NDRX_MAX_RMS)
0505     {
0506         NDRX_LOG(log_error, "RMID %hd out of bounds!!!");
0507         EXFAIL_OUT(ret);
0508     }
0509     
0510     /* if processing in background (say time-out rollback, the commit shall not be
0511      * accepted)
0512      */
0513     if (p_tl->is_background || XA_TX_STAGE_ACTIVE!=p_tl->txstage)
0514     {
0515         NDRX_LOG(log_error, "cannot register xid [%s] is_background: (%d) stage: (%hd)", 
0516                 xai->tmxid, p_tl->is_background, p_tl->txstage);
0517         EXFAIL_OUT(ret);
0518     }
0519     
0520     /*
0521     if (p_tl->rmstatus[rmid-1].rmstatus && NULL!=p_is_already_logged)
0522     {
0523         *p_is_already_logged = EXTRUE;
0524     }
0525      * */
0526     /*
0527     p_tl->rmstatus[rmid-1] = stat;
0528     */
0529     
0530     ret = tms_btid_addupd(p_tl, rmid, btid, XA_RM_STATUS_NULL, 
0531             0, 0, p_is_already_logged, &bt);
0532     
0533     /* log to transaction file */
0534     if (!(*p_is_already_logged))
0535     {
0536         char start_stat = XA_RM_STATUS_ACTIVE;
0537         
0538         NDRX_LOG(log_info, "RMID %hd/%ld added to xid_str: [%s]", 
0539             rmid, *btid, xai->tmxid);
0540         
0541         if (flags & TMFLAGS_TPNOSTARTXID)
0542         {
0543             NDRX_LOG(log_info, "TPNOSTARTXID => adding as %c - unknown", 
0544                     XA_RM_STATUS_UNKOWN);
0545             start_stat = XA_RM_STATUS_UNKOWN;
0546         }
0547         
0548         if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, start_stat, 0, 0))
0549         {
0550             NDRX_LOG(log_error, "Failed to write RM status to file: %ld", *btid);
0551             EXFAIL_OUT(ret);
0552         }
0553     }
0554     else
0555     {
0556         NDRX_LOG(log_info, "RMID %hd/%ld already joined to xid_str: [%s]", 
0557             rmid, *btid, xai->tmxid);
0558     }
0559    
0560 out:
0561     /* unlock transaction from thread */
0562     tms_unlock_entry(p_tl);
0563 
0564 out_nolock:
0565     
0566     return ret;
0567 }
0568 
0569 /**
0570  * Change RM status
0571  * @param xai
0572  * @param rmid - 1 based rmid
0573  * @param[in] btid branch tid 
0574  * @param[in] rmstatus RM status to set
0575  * @return EXSUCCEED/EXFAIL
0576  */
0577 expublic int tms_log_chrmstat(atmi_xa_tx_info_t *xai, short rmid, 
0578         long btid, char rmstatus, UBFH *p_ub)
0579 {
0580     int ret = EXSUCCEED;
0581     atmi_xa_log_t *p_tl= NULL;
0582     atmi_xa_rm_status_btid_t *bt = NULL;
0583     int locke;
0584     
0585     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c]",
0586             xai->tmxid, btid, rmstatus);
0587     
0588     if (NULL==(p_tl = tms_log_get_entry(xai->tmxid, NDRX_LOCK_WAIT_TIME, &locke)))
0589     {
0590         if (locke)
0591         {
0592             NDRX_LOG(log_error, "Lock acquire timed out for xid_str: [%s] - TPETIME", 
0593                     xai->tmxid);
0594             atmi_xa_set_error_fmt(p_ub, TPETIME, NDRX_XA_ERSN_LOCK, 
0595                         "Failed to acquire locked!");
0596         }
0597         else
0598         {
0599             NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0600                     xai->tmxid);
0601 
0602             atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_NOTX, 
0603                         "Failed to get transaction or locked for processing!");
0604         }
0605         
0606         ret=EXFAIL;
0607         goto out_nolock;
0608     }
0609     
0610     bt = tms_btid_find(p_tl, rmid, btid);
0611     
0612     if (rmstatus==bt->rmstatus)
0613     {
0614         NDRX_LOG(log_warn, "xid: [%s] BTID %ld already in status [%c]",
0615             xai->tmxid, btid, rmstatus);
0616     }
0617     
0618     if (XA_RM_STATUS_UNKOWN!=bt->rmstatus)
0619     {
0620         NDRX_LOG(log_error, "No transaction under xid_str: [%s] - match ", 
0621                 xai->tmxid);
0622         
0623         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_INVPARAM, 
0624                     "BTID %ld in status %c but want to set to: %c!",
0625                 btid, bt->rmstatus, rmstatus);
0626         
0627         EXFAIL_OUT(ret);
0628     }
0629     
0630     /* Change status to expected one.. */
0631     
0632     if (EXSUCCEED!=tms_log_rmstatus(p_tl, bt, rmstatus, 0, 0))
0633     {
0634         NDRX_LOG(log_error, "Failed to write RM status to file: %ld, "
0635                 "new stat: %c old stat: [%c]", 
0636                 btid, rmstatus, bt->rmstatus);
0637         
0638         atmi_xa_set_error_fmt(p_ub, TPEMATCH, NDRX_XA_ERSN_RMLOGFAIL, 
0639                     "BTID %ld in status %c but want to set to: %c!",
0640                 btid, bt->rmstatus, rmstatus);
0641         
0642         EXFAIL_OUT(ret);
0643     }
0644     
0645     NDRX_LOG(log_debug, "xid: [%s] BTID %ld change status to [%c] OK",
0646             xai->tmxid, btid, rmstatus);
0647    
0648 out:
0649     /* unlock transaction from thread */
0650     tms_unlock_entry(p_tl);
0651 
0652 out_nolock:
0653     
0654     return ret;
0655 }
0656 
0657 /**
0658  * Generate logfile path + name for transaction
0659  * @param fname output buffer
0660  * @param fnamesz output buffer size
0661  * @param tmxid transaction id (str)
0662  */
0663 exprivate void tms_gen_file_name(char *fname, size_t fnamesz, char *tmxid)
0664 {
0665         snprintf(fname, fnamesz, "%s/TRN-%ld-%hd-%d-%s",
0666             G_tmsrv_cfg.tlog_dir, G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid,
0667             G_server_conf.srv_id, tmxid);
0668 }
0669 
0670 /**
0671  * Get the log file name for particular transaction
0672  * @param p_tl
0673  * @return 
0674  */
0675 exprivate void tms_get_file_name(atmi_xa_log_t *p_tl)
0676 {
0677     tms_gen_file_name(p_tl->fname, sizeof(p_tl->fname), p_tl->tmxid);
0678 }
0679 
0680 /**
0681  * Get the log file name for particular transaction
0682  * @param p_tl
0683  * @return 
0684  */
0685 expublic int tms_open_logfile(atmi_xa_log_t *p_tl, char *mode)
0686 {
0687     int ret = EXSUCCEED;
0688     
0689     if (EXEOS==p_tl->fname[0])
0690     {
0691         tms_get_file_name(p_tl);
0692     }
0693     
0694     if (NULL!=p_tl->f)
0695     {
0696         /*
0697         NDRX_LOG(log_warn, "Log file [%s] already open", p_tl->fname);
0698          */
0699         goto out;
0700     }
0701     
0702     /* Try to open the file */
0703     if (ndrx_G_systest_lockloss || NULL==(p_tl->f=NDRX_FOPEN(p_tl->fname, mode)))
0704     {
0705         userlog("Failed to open XA transaction log file: [%s]: %s", 
0706                 p_tl->fname, strerror(errno));
0707         NDRX_LOG(log_error, "Failed to open XA transaction "
0708                 "log file: [%s]: %s", 
0709                 p_tl->fname, strerror(errno));
0710         ret=EXFAIL;
0711         goto out;
0712     }
0713     
0714     NDRX_LOG(log_debug, "XA tx log file [%s] open for [%s]", 
0715             p_tl->fname, mode);
0716 
0717 out:    
0718     return ret;
0719 }
0720 
0721 /**
0722  * Do housekeeping on log file
0723  * @param logfile - path to log file
0724  * @return EXTRUE (did remove) / EXFALSE (did not remove)
0725  */
0726 expublic int tms_housekeep(char *logfile)
0727 {
0728     long diff;
0729     int err;
0730     int ret = EXFALSE;
0731 
0732     /* remove old corrupted logs... */
0733     if ((diff=ndrx_file_age(logfile)) > G_tmsrv_cfg.housekeeptime)
0734     {
0735         NDRX_LOG(log_error, "Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
0736                 logfile, diff, G_tmsrv_cfg.housekeeptime);
0737         userlog("Corrupted log file [%s] age %ld sec (housekeep %d) - removing",
0738                 logfile, diff, G_tmsrv_cfg.housekeeptime);
0739 
0740         if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(logfile))
0741         {
0742             err = errno;
0743             NDRX_LOG(log_error, "Failed to unlink [%s]: %s", logfile, strerror(err));
0744             userlog("Failed to unlink [%s]: %s", logfile, strerror(err));
0745         }
0746         else
0747         {
0748             ret=EXTRUE;
0749         }
0750     }
0751 
0752     return ret;
0753 }
0754 
0755 /**
0756  * Read the log file from disk
0757  * @param logfile - path to log file
0758  * @param tmxid - transaction ID string
0759  * @param pp_tl - transaction log (double ptr). Returned if all ok.
0760  * @param log_removed did we remove log file (housekeep) executed?
0761  * @param housekeepable is log file housekeepable?
0762  * @return SUCCEED/FAIL
0763  */
0764 expublic int tms_load_logfile(char *logfile, char *tmxid, atmi_xa_log_t **pp_tl,
0765     int *log_removed, int *housekeepable)
0766 {
0767     int ret = EXSUCCEED;
0768     int len;
0769     char buf[1024]; /* Should be enough... */
0770     char *p;
0771     int line = 1;
0772     int got_term_last=EXFALSE, infos_ok=EXFALSE, missing_term_in_file=EXFALSE;
0773     int do_housekeep = EXFALSE;
0774     int wrote, err;
0775     long diff;
0776     *pp_tl = NULL;
0777 
0778     *log_removed = EXFALSE;
0779     *housekeepable = EXFALSE;
0780 
0781     if (NULL==(*pp_tl = NDRX_CALLOC(sizeof(atmi_xa_log_t), 1)))
0782     {
0783         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0784         EXFAIL_OUT(ret);
0785     }
0786     /* set file name */
0787     if (strlen(logfile)>PATH_MAX)
0788     {
0789         NDRX_LOG(log_error, "Invalid file name!");
0790         EXFAIL_OUT(ret);
0791     }
0792     
0793     NDRX_STRCPY_SAFE((*pp_tl)->fname, logfile);
0794     
0795     len = strlen(tmxid);
0796     if (len>sizeof((*pp_tl)->tmxid)-1)
0797     {
0798         NDRX_LOG(log_error, "tmxid too long [%s]!", tmxid);
0799         EXFAIL_OUT(ret);
0800     }
0801     
0802     NDRX_STRCPY_SAFE((*pp_tl)->tmxid, tmxid);
0803     /* we start with active... */
0804     (*pp_tl)->txstage = XA_TX_STAGE_ACTIVE;
0805 
0806     /* set lock sequence number, if in singleton group */
0807     if (EXSUCCEED!=tms_log_setseq(*pp_tl))
0808     {
0809         EXFAIL_OUT(ret);
0810     }
0811 
0812     /* Open the file */
0813     if (EXSUCCEED!=tms_open_logfile(*pp_tl, "a+"))
0814     {
0815         NDRX_LOG(log_error, "Failed to open transaction file!");
0816         EXFAIL_OUT(ret);
0817     }
0818 
0819     /* On mac and freebsd seem that reading happens at end of the
0820      * file if file was opened with a+
0821      */
0822     if (EXSUCCEED!=fseek((*pp_tl)->f, 0, SEEK_SET))
0823     {
0824         NDRX_LOG(log_error, "Failed to fseek: %s", strerror(errno));
0825         EXFAIL_OUT(ret);
0826     }
0827 
0828     /* Read line by line & call parsing functions 
0829      * we should also parse start of the transaction (date/time)
0830      */
0831     while (fgets(buf, sizeof(buf), (*pp_tl)->f))
0832     {
0833         got_term_last = EXFALSE;
0834         len = strlen(buf);
0835         
0836         if (1 < len && '\n'==buf[len-1])
0837         {
0838             buf[len-1]=EXEOS;
0839             len--;
0840             got_term_last = EXTRUE;
0841         }
0842 
0843         /* if somebody operated with Windows... */
0844         if (1 < len && '\r'==buf[len-1])
0845         {
0846             buf[len-1]=EXEOS;
0847             len--;
0848         }
0849         
0850         NDRX_LOG(log_debug, "Parsing line: [%s]", buf);
0851         
0852         if (!got_term_last)
0853         {
0854             /* Corrupted file, terminator expected 
0855              * - mark record as corrupted.
0856              * - Also we will try to repair the situation, if last line was corrupted
0857              * then we mark with with NAK.
0858              */
0859             missing_term_in_file = EXTRUE;
0860             continue;
0861         }
0862         
0863         /* If there was power off, then we might get corrupted lines...
0864          */
0865         p = strchr(buf, ':');
0866         
0867         if (NULL==p)
0868         {
0869             NDRX_LOG(log_error, "Symbol ':' not found on line [%d]!", 
0870                     line);
0871             EXFAIL_OUT(ret);
0872         }
0873         p++;
0874         
0875         /* Classify & parse record */
0876         NDRX_LOG(log_error, "Record: %c log_version: %d", *p, (*pp_tl)->log_version);
0877         
0878         /* If this is initial line && record J
0879          * or log_version > 0, perform checksuming.
0880          */
0881         if ((!infos_ok && LOG_COMMAND_J==*p) || ((*pp_tl)->log_version > LOG_VERSION_1))
0882         {
0883             char *rs = strchr(buf, LOG_RS_SEP);
0884             unsigned long crc32_calc=0, crc32_got=0;
0885             
0886             if (NULL==rs)
0887             {
0888                 NDRX_LOG(log_error, "TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0889                             logfile, LOG_RS_SEP, buf);
0890                 userlog("TMSRV log file [%s] missing %c sep on line [%s] - ignore line", 
0891                             logfile, LOG_RS_SEP, buf);
0892                 continue;
0893             }
0894             
0895             *rs = EXEOS;
0896             rs++;
0897             
0898             /* Now we get crc32 */
0899             crc32_calc = ndrx_Crc32_ComputeBuf(0, buf, strlen(buf));
0900             sscanf(rs, "%lx", &crc32_got);
0901             
0902             /* verify the log... */
0903             if (crc32_calc!=crc32_got)
0904             {
0905                 NDRX_LOG(log_error, "TMSRV log file [%s] invalid log [%s] line "
0906                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0907                         logfile, buf, crc32_calc, crc32_got);
0908                 userlog("TMSRV log file [%s] invalid log [%s] line "
0909                         "crc32 calc: [%lx] vs rec: [%lx] - ignore line", 
0910                          logfile, buf, crc32_calc, crc32_got);
0911                 continue;
0912             }
0913         }
0914         
0915         switch (*p)
0916         {
0917             /* Version 0: */
0918             case LOG_COMMAND_I:
0919             /* Version 1: */
0920             case LOG_COMMAND_J:
0921                 
0922                 if (infos_ok)
0923                 {
0924                     NDRX_LOG(log_error, "TMSRV log file [%s] duplicate info block", 
0925                             logfile);
0926                     userlog("TMSRV log file [%s] duplicate info block", 
0927                             logfile);
0928                     do_housekeep=EXTRUE;
0929                     EXFAIL_OUT(ret);
0930                 }
0931                 
0932                 /* set the version number */
0933                 (*pp_tl)->log_version = *p - LOG_COMMAND_I + 1;
0934                 
0935                 if (EXSUCCEED!=tms_parse_info(buf, *pp_tl))
0936                 {
0937                     EXFAIL_OUT(ret);
0938                 }
0939                 
0940                 infos_ok = EXTRUE;
0941                 break;
0942             case LOG_COMMAND_STAGE: 
0943                 
0944                 if (!infos_ok)
0945                 {
0946                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0947                             logfile);
0948                     userlog("TMSRV log file [%s] Info record required first", 
0949                             logfile);
0950                     do_housekeep=EXTRUE;
0951                     EXFAIL_OUT(ret);
0952                 }
0953                 
0954                 if (EXSUCCEED!=tms_parse_stage(buf, *pp_tl))
0955                 {
0956                     EXFAIL_OUT(ret);
0957                 }
0958                 
0959                 break;
0960             case LOG_COMMAND_RMSTAT:
0961                 
0962                 if (!infos_ok)
0963                 {
0964                     NDRX_LOG(log_error, "TMSRV log file [%s] Info record required first", 
0965                             logfile);
0966                     userlog("TMSRV log file [%s] Info record required first", 
0967                             logfile);
0968                     do_housekeep=EXTRUE;
0969                     EXFAIL_OUT(ret);
0970                 }
0971                 
0972                 if (EXSUCCEED!=tms_parse_rmstatus(buf, *pp_tl))
0973                 {
0974                     EXFAIL_OUT(ret);
0975                 }
0976                 
0977                 break;
0978             default:
0979                 NDRX_LOG(log_warn, "Unknown record %c on line %d", *p, line);
0980                 break;
0981         }
0982         
0983         line++;
0984     }
0985     
0986     /* check was last read OK */
0987     if (!feof((*pp_tl)->f))
0988     {
0989         err = ferror((*pp_tl)->f);
0990         
0991         NDRX_LOG(log_error, "TMSRV log file [%s] failed to read: %s", 
0992                     logfile, strerror(err));
0993         userlog("TMSRV log file [%s] failed to read: %s", 
0994                     logfile, strerror(err));
0995         EXFAIL_OUT(ret);
0996     }
0997     
0998     if (!infos_ok)
0999     {
1000         NDRX_LOG(log_error, "TMSRV log file [%s] no [%c] info record - not loading", 
1001                     logfile, LOG_COMMAND_J);
1002         userlog("TMSRV log file [%s] no [%c] info record - not loading", 
1003                     logfile, LOG_COMMAND_J);
1004         do_housekeep=EXTRUE;
1005         EXFAIL_OUT(ret);
1006     }
1007     
1008     /* so that next time we can parse OK */
1009     if (missing_term_in_file)
1010     {
1011         /* so last line bad, lets fix... */
1012         if (got_term_last)
1013         {
1014             NDRX_LOG(log_error, "TMSRV log file [%s] - invalid read, "
1015                     "previous lines seen without \\n", 
1016                     logfile);
1017             userlog("TMSRV log file [%s] - invalid read, "
1018                     "previous lines without \\n", 
1019                     logfile);
1020             EXFAIL_OUT(ret);
1021         }
1022         else
1023         {
1024             /* append with \n */
1025             NDRX_LOG(log_error, "Terminating last line (with out checksum)");
1026 
1027         if (ndrx_G_systest_lockloss)
1028         {
1029                 wrote=EXFAIL;
1030         }
1031         else
1032         {
1033                 wrote=fprintf((*pp_tl)->f, "\n");
1034         }
1035 
1036             if (wrote!=1)
1037             {
1038                 err = ferror((*pp_tl)->f);
1039                 NDRX_LOG(log_error, "TMSRV log file [%s] failed to terminate line: %s", 
1040                         logfile, strerror(err));
1041                 userlog("TMSRV log file [%s] failed to terminate line: %s", 
1042                         logfile, strerror(err));
1043                 EXFAIL_OUT(ret);
1044             }
1045         }
1046     }
1047     
1048     /* thus it will start to drive it by background thread
1049      * Any transaction will be completed in background.
1050      */
1051     (*pp_tl)->is_background = EXTRUE;
1052     
1053     /* 
1054      * If we keep in active state, then timeout will kill the transaction (or it will be finished by in progress 
1055      *  binary, as maybe transaction is not yet completed). Thought we might loss the infos
1056      *  of some registered process, but then it would be aborted transaction anyway.
1057      *  but then record might be idling in the resource.
1058      * 
1059      * If we had stage logged, then transaction would be completed accordingly,
1060      * and would complete with those resources for which states is not yet
1061      * finalized (according to logs)
1062      */
1063     
1064     /* Add transaction to the hash. We need locking here. 
1065      * 
1066      * If transaction was in prepare stage XA_TX_STAGE_PREPARING (40)
1067      * We need to abort it because, there is no chance that caller will get
1068      * response back.
1069      */
1070     if (XA_TX_STAGE_PREPARING == (*pp_tl)->txstage 
1071             || XA_TX_STAGE_ACTIVE == (*pp_tl)->txstage)
1072     {
1073         NDRX_LOG(log_error, "XA Transaction [%s] was in active or preparing stage and "
1074                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1075         
1076         userlog("XA Transaction [%s] was in  active or preparing stage and "
1077                 "tmsrv is restarted - ABORTING", (*pp_tl)->tmxid);
1078         
1079         /* change the status (+ log) */
1080         (*pp_tl)->lockthreadid = ndrx_gettid();
1081         tms_log_stage(*pp_tl, XA_TX_STAGE_ABORTING, EXTRUE);
1082         (*pp_tl)->lockthreadid = 0;
1083     }
1084     
1085     MUTEX_LOCK_V(M_tx_hash_lock);
1086     EXHASH_ADD_STR( M_tx_hash, tmxid, (*pp_tl));
1087     MUTEX_UNLOCK_V(M_tx_hash_lock);
1088     
1089     NDRX_LOG(log_debug, "TX [%s] loaded OK", tmxid);
1090 out:
1091 
1092     /* Clean up if error. */
1093     if (EXSUCCEED!=ret)
1094     {
1095         userlog("Failed to load/recover transaction [%s]", logfile);
1096         if (NULL!=*pp_tl)
1097         {
1098             /* remove any stuff added... */
1099             
1100             if (tms_is_logfile_open(*pp_tl))
1101             {
1102                 tms_close_logfile(*pp_tl);
1103             }
1104 
1105             tms_remove_logfree(*pp_tl, EXFALSE);
1106             *pp_tl = NULL;
1107         }
1108     }
1109 
1110     /* clean up corrupted files */
1111     if (do_housekeep)
1112     {
1113         *housekeepable=EXTRUE;
1114 
1115         /* so that hashmaps can be cleaned up... */
1116         if (tms_housekeep(logfile))
1117         {
1118             *log_removed=EXTRUE;
1119         }
1120     }
1121 
1122     return ret;
1123 }
1124 
1125 /**
1126  * Test to see is log file open
1127  * @param p_tl
1128  * @return TRUE/FALSE
1129  */
1130 expublic int tms_is_logfile_open(atmi_xa_log_t *p_tl)
1131 {
1132     if (p_tl->f)
1133         return EXTRUE;
1134     else
1135         return EXFALSE;
1136 }
1137 
1138 /**
1139  * Close the log file
1140  * @param p_tl
1141  */
1142 expublic void tms_close_logfile(atmi_xa_log_t *p_tl)
1143 {
1144     if (NULL!=p_tl->f)
1145     {
1146         NDRX_FCLOSE(p_tl->f);
1147         p_tl->f = NULL;
1148     }
1149 }
1150 
1151 /**
1152  * Free up log file (just memory)
1153  * @param p_tl log handle
1154  * @param hash_rm remove log entry from tran hash
1155  */
1156 expublic void tms_remove_logfree(atmi_xa_log_t *p_tl, int hash_rm)
1157 {
1158     int i;
1159     atmi_xa_rm_status_btid_t *el, *elt;
1160     
1161     if (hash_rm)
1162     {
1163         MUTEX_LOCK_V(M_tx_hash_lock);
1164         EXHASH_DEL(M_tx_hash, p_tl); 
1165         MUTEX_UNLOCK_V(M_tx_hash_lock);
1166     }
1167 
1168     /* Remove branch TIDs */
1169     
1170     for (i=0; i<NDRX_MAX_RMS; i++)
1171     {
1172         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1173         {
1174             EXHASH_DEL(p_tl->rmstatus[i].btid_hash, el);
1175             NDRX_FREE(el);
1176         }
1177     }
1178     
1179     NDRX_FREE(p_tl);
1180 }
1181 
1182 /**
1183  * Removes also transaction from hash. Either fully
1184  * committed or fully aborted...
1185  * @param p_tl - becomes invalid after removal!
1186  * @param hash_rm remove from hash lists
1187  */
1188 expublic void tms_remove_logfile(atmi_xa_log_t *p_tl, int hash_rm)
1189 {
1190     int have_file = EXFALSE;
1191     
1192     if (tms_is_logfile_open(p_tl))
1193     {
1194         have_file = EXTRUE;
1195         tms_close_logfile(p_tl);
1196     }/* Check for file existance, if was not open */
1197     else if (0 == access(p_tl->fname, 0))
1198     {
1199         have_file = EXTRUE;
1200     }
1201         
1202     if (have_file)
1203     {
1204         if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(p_tl->fname))
1205         {
1206             int err = errno;
1207             NDRX_LOG(log_debug, "Failed to remove tx log file [%s]: %d (%s)", 
1208                     p_tl->fname, err, strerror(err));
1209             userlog("Failed to remove tx log file [%s]: %d (%s)", 
1210                     p_tl->fname, err, strerror(err));
1211         }
1212     }
1213     
1214     /* Remove the log for hash! */
1215     tms_remove_logfree(p_tl, hash_rm);
1216 }
1217 
1218 /**
1219  * We should have universal log writter
1220  */
1221 exprivate int tms_log_write_line(atmi_xa_log_t *p_tl, char command, const char *fmt, ...)
1222 {
1223     int ret = EXSUCCEED;
1224     char msg[LOG_MAX+1] = {EXEOS};
1225     char msg2[LOG_MAX+1] = {EXEOS};
1226     int len, wrote, exp;
1227     int make_error = EXFALSE;
1228     unsigned long crc32;
1229     va_list ap;
1230     
1231     CHK_THREAD_ACCESS;
1232     
1233     /* If log not open - just skip... */
1234     if (NULL==p_tl->f)
1235     {
1236         return EXSUCCEED;
1237     }
1238      
1239     va_start(ap, fmt);
1240     (void) vsnprintf(msg, sizeof(msg), fmt, ap);
1241     va_end(ap);
1242     
1243     snprintf(msg2, sizeof(msg2), "%lld:%c:%s", ndrx_utc_tstamp(), command, msg);
1244     len = strlen(msg2);
1245     
1246     /* check exactly how much bytes was written */
1247     
1248     if (1==G_atmi_env.test_tmsrv_write_fail)
1249     {
1250         make_error = EXTRUE;
1251     }
1252     else if (2==G_atmi_env.test_tmsrv_write_fail)
1253     {
1254         /* generate 25% random errors */
1255         if (ndrx_rand() % 4 == 0)
1256         {
1257             make_error = EXTRUE;
1258         }
1259         
1260     }
1261     
1262     crc32 = ndrx_Crc32_ComputeBuf(0, msg2, len);
1263     
1264     /* Simulate that only partial message is written in case
1265      * if disk is full or crash happens
1266      */
1267     wrote=0;
1268     
1269     /* if write file test case - write partially, also omit EOL terminator 
1270      * Also... needs flag for commit crash simulation... i.e. 
1271      * So that if commit crash is set, we partially write the command (stage change)
1272      * and afterwards tmsrv exits.
1273      * And then when flag is removed, transaction shall commit OK
1274      */
1275     if (p_tl->log_version == LOG_VERSION_1)
1276     {
1277         NDRX_LOG(log_debug, "Log format: v%d", p_tl->log_version);
1278         
1279         exp = len+1;
1280         if (make_error)
1281         {
1282             exp++;
1283         }
1284 
1285     if (ndrx_G_systest_lockloss)
1286     {
1287             wrote=EXFAIL;
1288     }
1289     else
1290     {
1291             wrote=fprintf(p_tl->f, "%s\n", msg2);
1292     }
1293 
1294     }
1295     else
1296     {
1297         /* version 2+ */
1298         exp = len+8+1+1;
1299         
1300         if (make_error)
1301         {
1302             crc32+=1;
1303             exp++;
1304         }
1305         
1306     if (ndrx_G_systest_lockloss)
1307     {
1308             wrote=EXFAIL;
1309     }
1310     else
1311     {
1312             wrote=fprintf(p_tl->f, "%s%c%08lx\n", msg2, LOG_RS_SEP, crc32);
1313     }
1314     }
1315     
1316     if (wrote != exp)
1317     {
1318         int err = errno;
1319         
1320         /* For Q/A purposes - simulate no space error, if requested */
1321         if (make_error)
1322         {
1323             NDRX_LOG(log_error, "QA point: make_error TRUE");
1324             err = ENOSPC;
1325         }
1326         
1327         NDRX_LOG(log_error, "ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1328                 exp, wrote, strerror(err));
1329         userlog("ERROR! Failed to write transaction log file: req_len=%d, written=%d: %s",
1330                 exp, wrote, strerror(err));
1331         
1332         ret=EXFAIL;
1333         goto out;
1334     }
1335     
1336     
1337 out:
1338     /* flush what ever we have */
1339     /* in case of ndrx_G_systest_lockloss -> IO fence used */
1340     if (ndrx_G_systest_lockloss || EXSUCCEED!=fflush(p_tl->f))
1341     {
1342         int err=errno;
1343         userlog("ERROR! Failed to fflush(): %s", strerror(err));
1344         NDRX_LOG(log_error, "ERROR! Failed to fflush(): %s", strerror(err));
1345     ret=EXFAIL;
1346     }
1347     /*fsync(fileno(p_tl->f));*/
1348     return ret;
1349 }
1350 
1351 /**
1352  * Write general info about transaction
1353  * FORMAT: <RM_ID>:<NODE_ID>:<SRV_ID>
1354  * @param p_tl
1355  * @param stage
1356  * @return 
1357  */
1358 expublic int tms_log_info(atmi_xa_log_t *p_tl)
1359 {
1360     int ret = EXSUCCEED;
1361     char rmsbuf[NDRX_MAX_RMS*3+1]={EXEOS};
1362     int i;
1363     int len;
1364     
1365     CHK_THREAD_ACCESS;
1366     
1367     /* log the RMs participating in transaction 
1368     for (i=0; i<NDRX_MAX_RMS; i++)
1369     {
1370         if (p_tl->rmstatus[i].rmstatus)
1371         {
1372             len = strlen(rmsbuf);
1373             if (len>0)
1374             {
1375                 rmsbuf[len]=',';
1376                 rmsbuf[len+1]=',';
1377                 len++;
1378             }
1379             sprintf(rmsbuf+len, "%d", i+1);
1380         }
1381     }
1382      *
1383      */
1384     
1385     if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_J, "%hd:%hd:%hd:%ld:%s", 
1386             p_tl->tmrmid, p_tl->tmnodeid, p_tl->tmsrvid, p_tl->txtout, rmsbuf))
1387     {
1388         ret=EXFAIL;
1389         goto out;
1390     }
1391     
1392 out:
1393     return ret;
1394 }
1395 /* Tokenizer variables */
1396 #define TOKEN_READ_VARS \
1397             int first = EXTRUE;\
1398             char *p;
1399 
1400 /* Read token for parsing */
1401 #define TOKEN_READ(X, Y)\
1402     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1403     {\
1404         NDRX_LOG(log_error, "Missing token: %s.%s!", X, Y);\
1405         EXFAIL_OUT(ret);\
1406     }\
1407     if (first)\
1408         first = EXFALSE;\
1409         
1410 #define TOKEN_READ_OPT(X, Y)\
1411     if (NULL==(p = strtok(first?buf:NULL, ":")))\
1412     {\
1413         NDRX_LOG(log_warn, "Missing token: %s.%s - optional, ignore!", X, Y);\
1414     }\
1415     if (first)\
1416         first = EXFALSE;\
1417 
1418 /**
1419  * Parse & rebuild the info from file record
1420  * @param buf - record from transaction file
1421  * @param p_tl - transaction record
1422  * @return SUCCEED/FAIL
1423  */
1424 exprivate int tms_parse_info(char *buf, atmi_xa_log_t *p_tl)
1425 {
1426     int ret = EXSUCCEED;
1427     TOKEN_READ_VARS;
1428     char *p2;
1429     char *saveptr1 = NULL;
1430     int rmid;
1431     long long tdiff;
1432     
1433     /* Read first time stamp */
1434     TOKEN_READ("info", "tstamp");
1435     p_tl->t_start = strtoull (p, NULL, 10);
1436     
1437     tdiff = ndrx_utc_tstamp() - p_tl->t_start;
1438     /* adjust the timer...  */
1439     ndrx_stopwatch_reset(&p_tl->ttimer);
1440     ndrx_stopwatch_minus(&p_tl->ttimer, tdiff);
1441     NDRX_LOG(log_debug, "Transaction age: %ld ms (t_start: %llu tdiff: %lld)",
1442         ndrx_stopwatch_get_delta(&p_tl->ttimer), p_tl->t_start, tdiff);
1443              
1444     /* read tmrmid, tmnodeid, tmsrvid = ignore, but they must be in place! */
1445     TOKEN_READ("info", "cmdid");
1446     /* Basically we ignore these values, and read from current process!!!: */
1447     TOKEN_READ("info", "tmrmid");
1448     TOKEN_READ("info", "tmnodeid");
1449     TOKEN_READ("info", "tmsrvid");
1450     p_tl->tmrmid = G_atmi_env.xa_rmid;
1451     p_tl->tmnodeid = G_tmsrv_cfg.vnodeid;
1452     p_tl->tmsrvid = G_server_conf.srv_id;
1453     
1454     TOKEN_READ("info", "txtout");
1455     p_tl->txtout = atol(p);
1456     
1457     /* Read involved resrouce managers - put them in join state */
1458 #if 0
1459     TOKEN_READ("info", "rmsbuf");
1460     
1461     - not used anymore. All active branches are logged as RM status
1462     p2 = strtok_r (p, ",", &saveptr1);
1463     while (p2 != NULL)
1464     {
1465         /* Put the p2 in join state */
1466         rmid = atoi(p2);
1467         if (rmid<1 || rmid>NDRX_MAX_RMS)
1468         {
1469             NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1470             EXFAIL_OUT(ret);
1471         }
1472         
1473         p_tl->rmstatus[rmid-1].rmstatus =XA_RM_STATUS_ACTIVE;
1474         
1475         p2 = strtok_r (NULL, ",", &saveptr1);
1476     }
1477 #endif
1478     
1479 out:                
1480     return ret;
1481 }
1482 
1483 #define CRASH_CLASS_EXIT            0 /**< exit at crash                  */
1484 #define CRASH_CLASS_NO_WRITE        1 /**< Do not write and report error  */
1485 
1486 /**
1487  * Change tx state + write transaction stage
1488  * FORMAT: <STAGE_CODE>
1489  * @param p_tl
1490  * @param forced is decision forced? I.e. no restore on error.
1491  * @return EXSUCCEED/EXFAIL
1492  */
1493 expublic int tms_log_stage(atmi_xa_log_t *p_tl, short stage, int forced)
1494 {
1495     int ret = EXSUCCEED;
1496     short stage_org=EXFAIL;
1497     /* <Crash testing> */
1498     int make_crash = EXFALSE; /**< Crash simulation */
1499     int crash_stage, crash_class;
1500     /* </Crash testing> */
1501     
1502     CHK_THREAD_ACCESS;
1503     
1504     if (p_tl->txstage!=stage)
1505     {
1506         stage_org = p_tl->txstage;
1507         p_tl->txstage = stage;
1508 
1509         NDRX_LOG(log_debug, "tms_log_stage: new stage - %hd (cc %d)", 
1510                 p_tl->txstage, G_atmi_env.test_tmsrv_commit_crash);
1511         
1512         
1513         /* <Crash testing> */
1514         
1515         /* QA: commit crash test point... 
1516          * Once crash flag is disabled, commit shall be finished by background
1517          * process.
1518          */
1519         crash_stage = G_atmi_env.test_tmsrv_commit_crash % 100;
1520         crash_class = G_atmi_env.test_tmsrv_commit_crash / 100;
1521         
1522         /* let write && exit */
1523         if (stage > 0 && crash_class==CRASH_CLASS_EXIT && stage == crash_stage)
1524         {
1525             NDRX_LOG(log_debug, "QA commit crash...");
1526             G_atmi_env.test_tmsrv_write_fail=EXTRUE;
1527             make_crash=EXTRUE;
1528         }
1529 
1530         /* no write & report error */
1531         if (stage > 0 && crash_class==CRASH_CLASS_NO_WRITE && stage == crash_stage)
1532         {
1533             NDRX_LOG(log_debug, "QA no write crash");
1534             ret=EXFAIL;
1535             goto out;
1536         }
1537         /* </Crash testing> */
1538         else
1539         {
1540             /* in case if ... there was failover and concurrent write of commit
1541              * which might overwrite the given abort position, lets write abort
1542              * states few times more, so that recovery processes would finally
1543              * see that abort has started...
1544              * Note that other tmsrv will not go further than last commit entry
1545              * as after that there is check is that tmsrv still on locked node.
1546              * ------
1547              * Also... this might not be 100% safe, as shared FS might become 
1548              * corrutped, if stale node starts to write somethink after the
1549              * other node took over. Thus hardware based fencing is mandatory
1550              * for production failover modes.
1551              */
1552             int w_cnt=1, i;
1553 
1554             if (XA_TX_STAGE_ABORTING==stage)
1555             {
1556                 w_cnt=3;
1557             }
1558             else if (XA_TX_STAGE_COMMITTING==stage)
1559             {
1560                 /* ensure that we are still locked, as we are going to write
1561                  * commit message down. If the abort was in progress,
1562                  * then our previous prepares might have overwritten  their
1563                  * abort state. Thus if we are ok here, then there will be
1564                  * 1x commit at given position, but if after then or before
1565                  * the actual disk write take over, they will put 3x aborts
1566                  * and then recovery process shall pick that up
1567                  */
1568                 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1569                 {
1570                     EXFAIL_OUT(ret);
1571                 }
1572             }
1573 
1574             for (i=0; i<w_cnt; i++)
1575             {
1576                 if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_STAGE, "%hd", stage))
1577                 {
1578                     ret=EXFAIL;
1579                     goto out;
1580                 }
1581             }
1582         }
1583 
1584         /* in case if switching to committing, we must sync the log & directory */
1585         if ((XA_TX_STAGE_COMMITTING==stage) || (XA_TX_STAGE_ABORTING==stage))
1586         {
1587         if (ndrx_G_systest_lockloss)
1588         {
1589         /*IO fence test */
1590             EXFAIL_OUT(ret);
1591         }
1592         else if (EXSUCCEED!=ndrx_fsync_fsync(p_tl->f, G_atmi_env.xa_fsync_flags) ||
1593                 EXSUCCEED!=ndrx_fsync_dsync(G_tmsrv_cfg.tlog_dir, G_atmi_env.xa_fsync_flags))
1594             {
1595                 EXFAIL_OUT(ret);
1596             }
1597 
1598             /* if we are in singleton group mode, validate that we still 
1599              * own the lock
1600              */
1601             if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
1602             {
1603                 EXFAIL_OUT(ret);
1604             }
1605 
1606         } /* new stage commit or abort */
1607     }
1608     
1609 out:
1610                 
1611     /* <Crash testing> */
1612     if (make_crash)
1613     {
1614         exit(1);
1615     }
1616     /* </Crash testing> */
1617 
1618     /* If failed to log the stage switch, restore original transaction
1619      * stage
1620      */
1621 
1622     if (forced)
1623     {
1624         return EXSUCCEED;
1625     }
1626     else if (EXSUCCEED!=ret && EXFAIL!=stage_org)
1627     {
1628         p_tl->txstage = stage_org;
1629     }
1630 
1631     /* if not forced, get the real result */
1632     return ret;
1633 }
1634 
1635 
1636 /**
1637  * Parse stage info from buffer read
1638  * @param buf - buffer read
1639  * @param p_tl - transaction record
1640  * @return SUCCEED/FAIL
1641  */
1642 exprivate int tms_parse_stage(char *buf, atmi_xa_log_t *p_tl)
1643 {
1644     int ret = EXSUCCEED;
1645     short stage;
1646     TOKEN_READ_VARS;
1647 
1648     TOKEN_READ("stage", "tstamp");
1649     p_tl->t_update = atol(p);
1650     TOKEN_READ("info", "cmdid");
1651 
1652     TOKEN_READ("stage", "txstage");
1653 
1654     /* In case ... if we are committing, there will be no abort.
1655     * and vice versa. that's in case if doing failover
1656     * the first state shall live on...
1657     * --- not true...
1658     * we could go to commit, but just right before logging
1659     * other node took over... starts to abort.
1660     * and first one wakes up and tries to write the commit stage...
1661     * thus we shall allow to go down from commit to abort.
1662     */
1663     stage=(short)atoi(p);
1664 
1665     if (stage < p_tl->txstage)
1666     {
1667         NDRX_LOG(log_error, "Transaction logs recover: stage for [%s] downgrade was %hd, read %hd",
1668                 p_tl->tmxid, p_tl->txstage, stage);
1669         userlog("Transaction logs recover: stage for [%s] downgrade was %hd, read %hd",
1670                 p_tl->tmxid, p_tl->txstage, stage);
1671     }
1672 
1673     if (p_tl->txstage>=XA_TX_STAGE_ABORTING &&
1674         p_tl->txstage<=XA_TX_STAGE_ABFORGOT_HEU)
1675     {
1676         if (stage>=XA_TX_STAGE_ABORTING &&
1677             stage<=XA_TX_STAGE_ABFORGOT_HEU)
1678         {
1679             p_tl->txstage = stage;
1680         }
1681         else
1682         {
1683             NDRX_LOG(log_error, "Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1684                 p_tl->tmxid, p_tl->txstage, stage);
1685             userlog("Invalid stage for [%s] was %hd (abort range) read %hd - IGNORE",
1686                 p_tl->tmxid, p_tl->txstage, stage);
1687         }
1688     }
1689     else
1690     {
1691         p_tl->txstage = stage;
1692     }
1693    
1694 out:
1695     return ret;
1696 }
1697 
1698 /**
1699  * Change + write RM status to file
1700  * FORMAT: <rmid>:<rmstatus>:<rmerrorcode>:<rmreason>
1701  * @param p_tl transaction
1702  * @param[in] bt transaction branch
1703  * @param[in] rmstatus Resource manager status
1704  * @param[in] rmerrorcode Resource manager error code
1705  * @param[in] rmreason reason code
1706  * @return SUCCEED/FAIL
1707  */
1708 expublic int tms_log_rmstatus(atmi_xa_log_t *p_tl, atmi_xa_rm_status_btid_t *bt, 
1709         char rmstatus, int rmerrorcode, short rmreason)
1710 {
1711     int ret = EXSUCCEED;
1712     int do_log = EXFALSE;
1713     
1714     CHK_THREAD_ACCESS;
1715 
1716     if (bt->rmstatus != rmstatus)
1717     {
1718         do_log = EXTRUE;
1719     }
1720     
1721     bt->rmstatus = rmstatus;
1722     bt->rmerrorcode = rmerrorcode;
1723     bt->rmreason = rmreason;
1724     
1725     if (do_log)
1726     {
1727         if (EXSUCCEED!=tms_log_write_line(p_tl, LOG_COMMAND_RMSTAT, "%hd:%c:%d:%hd:%ld",
1728                 bt->rmid, rmstatus, rmerrorcode, rmreason, bt->btid))
1729         {
1730             ret=EXFAIL;
1731             goto out;
1732         }
1733     }
1734     
1735 out:
1736     return ret;
1737 }
1738 
1739 /**
1740  * Parse RM's status record from transaction file
1741  * @param buf - transaction record's line
1742  * @param p_tl - internal transaction record
1743  * @return SUCCEED/FAIL;
1744  */
1745 exprivate int tms_parse_rmstatus(char *buf, atmi_xa_log_t *p_tl)
1746 {
1747     int ret = EXSUCCEED;
1748     char rmstatus;
1749     int rmerrorcode;
1750     short rmreason;
1751     int rmid;
1752     long btid;
1753     atmi_xa_rm_status_btid_t *bt = NULL;
1754     
1755     TOKEN_READ_VARS;
1756    
1757     TOKEN_READ("rmstat", "tstamp");
1758     p_tl->t_update = atol(p);
1759     TOKEN_READ("info", "cmdid");
1760     
1761     TOKEN_READ("rmstat", "rmid");
1762    
1763     rmid = atoi(p);
1764     if (rmid<1 || rmid>NDRX_MAX_RMS)
1765     {
1766         NDRX_LOG(log_error, "Invalid RMID: %d!", rmid);
1767         EXFAIL_OUT(ret);
1768     }
1769    
1770     TOKEN_READ("rmstat", "rmstatus");
1771     rmstatus = *p;
1772 
1773     TOKEN_READ("rmstat", "rmerrorcode");
1774     rmerrorcode = atoi(p);
1775 
1776     TOKEN_READ("rmstat", "rmreason");
1777     rmreason = (short)atoi(p);
1778 
1779     TOKEN_READ_OPT("rmstat", "btid");
1780     
1781     if (NULL!=p)
1782     {
1783         btid = atol(p);
1784     }
1785     else
1786     {
1787         btid=0;
1788     }
1789     
1790     /*
1791     p_tl->rmstatus[rmid-1].rmstatus = rmstatus;
1792     p_tl->rmstatus[rmid-1].rmerrorcode = rmerrorcode;
1793     p_tl->rmstatus[rmid-1].rmreason = rmreason;
1794      * */
1795     
1796     ret = tms_btid_addupd(p_tl, rmid, 
1797             &btid, rmstatus, rmerrorcode, rmreason, NULL, &bt);
1798    
1799 out:
1800     return ret;    
1801 }
1802 
1803 /**
1804  * Copy the background items to the linked list.
1805  * The idea is that this is processed by background. During that time, it does not
1806  * remove any items from hash. Thus pointers should be still valid. 
1807  * TODO: We should copy here transaction info too....
1808  * 
1809  * @param p_tl
1810  * @return 
1811  */
1812 expublic atmi_xa_log_list_t* tms_copy_hash2list(int copy_mode)
1813 {
1814     atmi_xa_log_list_t *ret = NULL;
1815     atmi_xa_log_t * r, *rt;
1816     atmi_xa_log_list_t *tmp;
1817     
1818     if (copy_mode & COPY_MODE_ACQLOCK)
1819     {
1820         MUTEX_LOCK_V(M_tx_hash_lock);
1821     }
1822     
1823     /* No changes to hash list during the lock. */    
1824     
1825     EXHASH_ITER(hh, M_tx_hash, r, rt)
1826     {
1827         /* Only background items... */
1828         if (r->is_background && !(copy_mode & COPY_MODE_BACKGROUND))
1829             continue;
1830         
1831         if (!r->is_background && !(copy_mode & COPY_MODE_FOREGROUND))
1832             continue;
1833                 
1834         if (NULL==(tmp = NDRX_CALLOC(1, sizeof(atmi_xa_log_list_t))))
1835         {
1836             NDRX_LOG(log_error, "Failed to malloc %d: %s", 
1837                     sizeof(atmi_xa_log_list_t), strerror(errno));
1838             goto out;
1839         }
1840         
1841         /* we should copy full TL structure, because some other thread might
1842          * will use it.
1843          * Having some invalid pointers inside does not worry us, because
1844          * we just need a list for a printing or xids for background txn lookup
1845          */
1846         memcpy(&tmp->p_tl, r, sizeof(*r));
1847         
1848         LL_APPEND(ret, tmp);
1849     }
1850     
1851 out:
1852     if (copy_mode & COPY_MODE_ACQLOCK)
1853     {
1854         MUTEX_UNLOCK_V(M_tx_hash_lock);
1855     }
1856 
1857     return ret;
1858 }
1859 
1860 /**
1861  * Lock the transaction log hash
1862  */
1863 expublic void tms_tx_hash_lock(void)
1864 {
1865     MUTEX_LOCK_V(M_tx_hash_lock);
1866 }
1867 
1868 /**
1869  * Unlock the transaction log hash
1870  */
1871 expublic void tms_tx_hash_unlock(void)
1872 {
1873     MUTEX_UNLOCK_V(M_tx_hash_lock);
1874 }
1875 
1876 /**
1877  * Copy TLOG info to FB (for display purposes)
1878  * The function has mandatory task to fill in the general fields about
1879  * transaction.
1880  * The actual RM/Branch tids are fill up till we get UBF buffer error (full)
1881  * @param p_ub - Dest UBF
1882  * @param p_tl - Transaction log
1883  * @param incl_rm_stat include RM status in response buffer
1884  * @return EXSUCCEED/EXFAIL
1885  */
1886 expublic int tms_log_cpy_info_to_fb(UBFH *p_ub, atmi_xa_log_t *p_tl, int incl_rm_stat)
1887 {
1888     int ret = EXSUCCEED;
1889     long tspent;
1890     short i;
1891     tspent = p_tl->txtout - ndrx_stopwatch_get_delta_sec(&p_tl->ttimer);    
1892     atmi_xa_rm_status_btid_t *el, *elt;
1893     
1894     if (tspent<0)
1895     {
1896         tspent = 0;
1897     }
1898     
1899     if (
1900             EXSUCCEED!=Bchg(p_ub, TMXID, 0, (char *)p_tl->tmxid, 0L) ||
1901             EXSUCCEED!=Bchg(p_ub, TMRMID, 0, (char *)&(p_tl->tmrmid), 0L) ||
1902             EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&(p_tl->tmnodeid), 0L) ||
1903             EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&(p_tl->tmsrvid), 0L) ||
1904             EXSUCCEED!=Bchg(p_ub, TMTXTOUT, 0, (char *)&(p_tl->txtout), 0L) ||
1905             EXSUCCEED!=Bchg(p_ub, TMTXTOUT_LEFT, 0, (char *)&tspent, 0L) ||
1906             EXSUCCEED!=Bchg(p_ub, TMTXSTAGE, 0, (char *)&(p_tl->txstage), 0L) ||
1907             EXSUCCEED!=Bchg(p_ub, TMTXTRYCNT, 0, (char *)&(p_tl->trycount), 0L) ||
1908             EXSUCCEED!=Bchg(p_ub, TMTXTRYMAXCNT, 0, (char *)&(G_tmsrv_cfg.max_tries), 0L)
1909         )
1910     {
1911         NDRX_LOG(log_error, "Failed to return fields: [%s]", 
1912                 Bstrerror(Berror));
1913         EXFAIL_OUT(ret);
1914     }
1915     
1916     /* return the info about RMs: */
1917     for (i=0; incl_rm_stat && i<NDRX_MAX_RMS; i++)
1918     {
1919         /* loop over the Branch TIDs  */
1920         EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
1921         {
1922             /* cast to long... */
1923             long rmerrorcode =  (long)el->rmerrorcode;
1924             
1925             if (
1926                 EXSUCCEED!=Badd(p_ub, TMTXRMID, (char *)&el->rmid, 0L) ||
1927                 EXSUCCEED!=Badd(p_ub, TMTXBTID, (char *)&el->btid, 0L) ||
1928                 EXSUCCEED!=Badd(p_ub, TMTXRMSTATUS,
1929                     (char *)&(el->rmstatus), 0L) ||
1930                 EXSUCCEED!=Badd(p_ub, TMTXRMERRCODE,
1931                     (char *)&rmerrorcode, 0L) ||
1932                 EXSUCCEED!=Badd(p_ub, TMTXRMREASON,
1933                     (char *)&(el->rmreason), 0L)
1934                 )
1935             {
1936                 /* there could be tons of these branches for 
1937                  * mysql/mariadb or posgresql 
1938                  */
1939                 NDRX_LOG(log_error, "Failed to return fields: [%s] - ignore", 
1940                             Bstrerror(Berror));
1941                 
1942                 /* finish off. */
1943                 goto out;
1944             }
1945         }
1946     } /* for */
1947     
1948 out:
1949     return ret;
1950 }
1951 
1952 /* vim: set ts=4 sw=4 et smartindent: */