Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Q XA Backend
0003  *   Prepare the folders & subfolders
0004  *   So we will have following directory structure:
0005  *   - active
0006  *   - prepared
0007  *   - committed
0008  *   Initially file will be named after the XID
0009  *   Once it becomes committed, it is named after message_id
0010  *   If we have a update to Q (try counter for example)
0011  *   Then we issue new transaction file with different command inside, but it contains
0012  *   the message_id
0013  *   Once do the commit and it is not and message, but update file, then
0014  *   we update message file.
0015  *   If the command is delete, then we unlink the file.
0016  *   Once Queue record is completed (rolled back or committed) we shall send ACK
0017  *   of COMMAND BLOCK back to queue server via TPCALL command to QSPACE server.
0018  *   this will allow to synchronize internal status of the messages.
0019  *   Initially (active) file is named is named after XID. When doing commit, it is
0020  *   renamed to msg_id.
0021  *   If we restore the system after the restart, then committed & prepare directory is scanned.
0022  *   If msg is committed + there is command in prepared, then it is marked as locked.
0023  *   When scanning prepared directory, we shall read the msg_id from files.
0024  *   We shall support multiple TMQs running over single Q space, but they each should,
0025  *   manage it's own set of queued messages.
0026  *   The queued file names shall contain [QSPACE].[SERVERID].[MSG_ID|XID]
0027  *   To post the updates to proper TMQ, it should advertise QSPACE.[SERVER_ID]
0028  *   In case of Active-Active env, servers only on node shall be run.
0029  *   On the other node we could put in standby qspace+server_id on the same shared dir.
0030  * ----------
0031  *   TODO: needs to consider to splitting this into two:
0032  *   1) generic XA switch communicating with Qspace via minicall
0033  *   2) Qspace server which servers as a backend for XA commands.
0034  *
0035  * @file qdisk_xa.c
0036  */
0037 /* -----------------------------------------------------------------------------
0038  * Enduro/X Middleware Platform for Distributed Transaction Processing
0039  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0040  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0041  * This software is released under one of the following licenses:
0042  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0043  * See LICENSE file for full text.
0044  * -----------------------------------------------------------------------------
0045  * AGPL license:
0046  *
0047  * This program is free software; you can redistribute it and/or modify it under
0048  * the terms of the GNU Affero General Public License, version 3 as published
0049  * by the Free Software Foundation;
0050  *
0051  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0052  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0053  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0054  * for more details.
0055  *
0056  * You should have received a copy of the GNU Affero General Public License along 
0057  * with this program; if not, write to the Free Software Foundation, Inc.,
0058  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0059  *
0060  * -----------------------------------------------------------------------------
0061  * A commercial use license is available from Mavimax, Ltd
0062  * contact@mavimax.com
0063  * -----------------------------------------------------------------------------
0064  */
0065 #include <string.h>
0066 #include <stdio.h>
0067 #include <stdlib.h>
0068 #include <memory.h>
0069 #include <math.h>
0070 #include <errno.h>
0071 #include <dirent.h>
0072 #include <sys/stat.h>
0073 #include <assert.h>
0074 
0075 #include <atmi.h>
0076 #include <ubf.h>
0077 #include <ndebug.h>
0078 #include <ndrstandard.h>
0079 #include <nstopwatch.h>
0080 
0081 #include <xa.h>
0082 #include <atmi_int.h>
0083 #include <unistd.h>
0084 
0085 #include "userlog.h"
0086 #include "tmqueue.h"
0087 #include "nstdutil.h"
0088 #include "Exfields.h"
0089 #include "atmi_tls.h"
0090 #include "tmqd.h"
0091 #include <qcommon.h>
0092 #include <xa_cmn.h>
0093 #include <ubfutil.h>
0094 #include "qtran.h"
0095 #include <singlegrp.h>
0096 #include <sys_test.h>
0097 /*---------------------------Externs------------------------------------*/
0098 /*---------------------------Macros-------------------------------------*/
0099 /* #define TXN_TRACE */
0100 /*---------------------------Enums--------------------------------------*/
0101 /*---------------------------Typedefs-----------------------------------*/
0102 /*---------------------------Globals------------------------------------*/
0103 expublic char ndrx_G_qspace[XATMI_SERVICE_NAME_LENGTH+1];   /**< Name of the queue space  */
0104 expublic char ndrx_G_qspacesvc[XATMI_SERVICE_NAME_LENGTH+1];/**< real service name      */
0105 /*---------------------------Statics------------------------------------*/
0106 
0107 exprivate char M_folder[PATH_MAX+1] = {EXEOS}; /**< Where to store the q data         */
0108 exprivate char M_folder_active[PATH_MAX+1] = {EXEOS}; /**< Active transactions        */
0109 exprivate char M_folder_prepared[PATH_MAX+1] = {EXEOS}; /**< Prepared transactions    */
0110 exprivate char M_folder_committed[PATH_MAX+1] = {EXEOS}; /**< Committed transactions  */
0111 exprivate int volatile M_folder_set = EXFALSE;   /**< init flag                     */
0112 exprivate MUTEX_LOCKDECL(M_folder_lock); /**< protect against race codition during path make*/
0113 exprivate MUTEX_LOCKDECL(M_init);   /**< init lock      */
0114 
0115 exprivate int M_is_tmqueue = EXFALSE;   /**< is this process a tmqueue ?        */
0116 
0117 exprivate  int (*M_p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname, 
0118         short nodeid, short srvid, char *qspace, long flags);
0119 exprivate int (*M_p_tmq_dum_add)(char *tmxid);
0120 exprivate int (*M_p_tmq_unlock_msg)(union tmq_upd_block *b);
0121 
0122 exprivate int (*M_p_tmq_msgid_exists)(char *msgid_str);
0123 exprivate void (*M_p_tpexit)(void);
0124 
0125 /** stopwatch for triggering disk checks */
0126 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;
0127 exprivate MUTEX_LOCKDECL(M_chkdisk_stopwatch_lock);   /**< init lock      */
0128 
0129 /*---------------------------Prototypes---------------------------------*/
0130 
0131 expublic int xa_open_entry_stat(char *xa_info, int rmid, long flags);
0132 expublic int xa_close_entry_stat(char *xa_info, int rmid, long flags);
0133 expublic int xa_start_entry_stat(XID *xid, int rmid, long flags);
0134 expublic int xa_end_entry_stat(XID *xid, int rmid, long flags);
0135 expublic int xa_rollback_entry_stat(XID *xid, int rmid, long flags);
0136 expublic int xa_prepare_entry_stat(XID *xid, int rmid, long flags);
0137 expublic int xa_commit_entry_stat(XID *xid, int rmid, long flags);
0138 expublic int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags);
0139 expublic int xa_forget_entry_stat(XID *xid, int rmid, long flags);
0140 expublic int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags);
0141 
0142 expublic int xa_open_entry_dyn(char *xa_info, int rmid, long flags);
0143 expublic int xa_close_entry_dyn(char *xa_info, int rmid, long flags);
0144 expublic int xa_start_entry_dyn(XID *xid, int rmid, long flags);
0145 expublic int xa_end_entry_dyn(XID *xid, int rmid, long flags);
0146 expublic int xa_rollback_entry_dyn(XID *xid, int rmid, long flags);
0147 expublic int xa_prepare_entry_dyn(XID *xid, int rmid, long flags);
0148 expublic int xa_commit_entry_dyn(XID *xid, int rmid, long flags);
0149 expublic int xa_recover_entry_dyn(XID *xid, long count, int rmid, long flags);
0150 expublic int xa_forget_entry_dyn(XID *xid, int rmid, long flags);
0151 expublic int xa_complete_entry_dyn(int *handle, int *retval, int rmid, long flags);
0152 
0153 expublic int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0154 expublic int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0155 expublic int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0156 expublic int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0157 expublic int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0158 expublic int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0159 expublic int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0160 expublic int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags);
0161 expublic int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0162 expublic int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags);
0163 
0164 exprivate int read_tx_block(FILE *f, char *block, int len, char *fname, char *dbg_msg, 
0165         int *err, int *tmq_err);
0166 exprivate int read_tx_from_file(char *fname, char *block, int len, int *err,
0167         int *tmq_err);
0168 
0169 exprivate void dirent_free(struct dirent **namelist, int n);
0170 
0171 exprivate int xa_rollback_entry_tmq(char *tmxid, long flags);
0172 exprivate int xa_prepare_entry_tmq(char *tmxid, long flags);
0173 exprivate int xa_commit_entry_tmq(char *tmxid, long flags);
0174 exprivate int write_to_tx_file(char *block, int len, char *cust_tmxid, int *int_diag);
0175 exprivate void tmq_chkdisk_th(void *ptr, int *p_finish_off);
0176 exprivate int tmq_check_prepared_exists_on_disk(char *tmxid);
0177 
0178 struct xa_switch_t ndrxqstatsw = 
0179 { 
0180     .name = "ndrxqstatsw",
0181     .flags = TMNOFLAGS,
0182     .version = 0,
0183     .xa_open_entry = xa_open_entry_stat,
0184     .xa_close_entry = xa_close_entry_stat,
0185     .xa_start_entry = xa_start_entry_stat,
0186     .xa_end_entry = xa_end_entry_stat,
0187     .xa_rollback_entry = xa_rollback_entry_stat,
0188     .xa_prepare_entry = xa_prepare_entry_stat,
0189     .xa_commit_entry = xa_commit_entry_stat,
0190     .xa_recover_entry = xa_recover_entry_stat,
0191     .xa_forget_entry = xa_forget_entry_stat,
0192     .xa_complete_entry = xa_complete_entry_stat
0193 };
0194 
0195 struct xa_switch_t ndrxqdynsw = 
0196 { 
0197     .name = "ndrxqdynsw",
0198     .flags = TMREGISTER,
0199     .version = 0,
0200     .xa_open_entry = xa_open_entry_dyn,
0201     .xa_close_entry = xa_close_entry_dyn,
0202     .xa_start_entry = xa_start_entry_dyn,
0203     .xa_end_entry = xa_end_entry_dyn,
0204     .xa_rollback_entry = xa_rollback_entry_dyn,
0205     .xa_prepare_entry = xa_prepare_entry_dyn,
0206     .xa_commit_entry = xa_commit_entry_dyn,
0207     .xa_recover_entry = xa_recover_entry_dyn,
0208     .xa_forget_entry = xa_forget_entry_dyn,
0209     .xa_complete_entry = xa_complete_entry_dyn
0210 };
0211 
0212 /**
0213  * Mark the current instance as part or not as part of tmqueue
0214  * @param setting EXTRUE/EXFALSE
0215  */
0216 expublic void tmq_set_tmqueue(
0217     int setting
0218     , int (*p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname, 
0219         short nodeid, short srvid, char *qspace, long flags)
0220     , int (*p_tmq_dum_add)(char *tmxid)
0221     , int (*p_tmq_unlock_msg)(union tmq_upd_block *b)
0222     , void (**p_tmq_chkdisk_th)(void *ptr, int *p_finish_off)
0223     , int (*p_tmq_msgid_exists)(char *msgid_str)
0224     , void (*p_tpexit)(void)
0225 )
0226 {
0227     M_is_tmqueue = setting;
0228     M_p_tmq_setup_cmdheader_dum = p_tmq_setup_cmdheader_dum;
0229     M_p_tmq_dum_add = p_tmq_dum_add;
0230     M_p_tmq_unlock_msg = p_tmq_unlock_msg;
0231 
0232     M_p_tmq_msgid_exists=p_tmq_msgid_exists;
0233     M_p_tpexit=p_tpexit;
0234     
0235     NDRX_LOG(log_debug, "qdisk_xa config: M_is_tmqueue=%d "
0236         "M_p_tmq_setup_cmdheader_dum=%p M_p_tmq_dum_add=%p M_p_tmq_unlock_msg=%p "
0237         "M_p_tmq_msgid_exists=%p M_p_tpexit=%p",
0238         M_is_tmqueue, M_p_tmq_setup_cmdheader_dum, M_p_tmq_dum_add, M_p_tmq_unlock_msg,
0239         M_p_tmq_msgid_exists, M_p_tpexit);
0240 
0241     /* Return some functions from our scope back */
0242     *p_tmq_chkdisk_th = tmq_chkdisk_th;
0243 
0244 }
0245 
0246 /**
0247  * Set filename base
0248  * @param xid
0249  * @param rmid
0250  * @return 
0251  */
0252 exprivate char *set_filename_base(XID *xid)
0253 {
0254     atmi_xa_serialize_xid(xid, G_atmi_tls->qdisk_tls->filename_base);    
0255     return G_atmi_tls->qdisk_tls->filename_base;
0256 }
0257 
0258 /**
0259  * Set base xid
0260  * @param tmxid xid string
0261  * @return ptr to buffer
0262  */
0263 exprivate char *set_filename_base_tmxid(char *tmxid)
0264 {
0265     /* may overlap in dynamic reg mode */
0266     if (tmxid!=(char *)G_atmi_tls->qdisk_tls->filename_base)
0267     {
0268         NDRX_STRCPY_SAFE(G_atmi_tls->qdisk_tls->filename_base, tmxid);
0269     }
0270 
0271     return G_atmi_tls->qdisk_tls->filename_base;
0272 }
0273 
0274 /**
0275  * Get the next file name for current transaction
0276  * 
0277  * @return EXSUCCEED (names set) / EXFAIL (transaction not found)
0278  */
0279 exprivate int set_filenames(int *p_seqno)
0280 {
0281     /* get next sequence number of tran 
0282      * tmxid is encoded in G_atmi_tls->qdisk_tls->filename_base
0283      * thus lookup the transaction, and get the next number
0284      */
0285     int ret = EXSUCCEED;
0286     int locke=EXFALSE;
0287     int seqno;
0288     qtran_log_t * p_tl = tmq_log_get_entry(G_atmi_tls->qdisk_tls->filename_base, 
0289         NDRX_LOCK_WAIT_TIME, &locke);
0290     
0291     if (NULL==p_tl)
0292     {
0293         NDRX_LOG(log_error, "Transaction [%s] not found", 
0294                 G_atmi_tls->qdisk_tls->filename_base);
0295         EXFAIL_OUT(ret);
0296     }
0297     
0298     seqno = tmq_log_next(p_tl);
0299     
0300     snprintf(G_atmi_tls->qdisk_tls->filename_active, sizeof(G_atmi_tls->qdisk_tls->filename_active), 
0301                 "%s/%s-%03d", M_folder_active, G_atmi_tls->qdisk_tls->filename_base, seqno);
0302     
0303     snprintf(G_atmi_tls->qdisk_tls->filename_prepared, sizeof(G_atmi_tls->qdisk_tls->filename_prepared), 
0304             "%s/%s-%03d", M_folder_prepared, G_atmi_tls->qdisk_tls->filename_base, seqno);
0305         
0306     NDRX_LOG(log_info, "Filenames set to: [%s] [%s] (base: [%s])", 
0307                 G_atmi_tls->qdisk_tls->filename_active, 
0308                 G_atmi_tls->qdisk_tls->filename_prepared,
0309                 G_atmi_tls->qdisk_tls->filename_base);
0310     
0311     *p_seqno = seqno;
0312 out:
0313                 
0314     /* if not recursive lock, then release */
0315     if (NULL!=p_tl && !locke)
0316     {
0317         tmq_log_unlock(p_tl);
0318     }
0319 
0320     return ret;
0321 }
0322 
0323 /**
0324  * Return max file number
0325  * @return 0 (no files), >=1 got something
0326  */
0327 exprivate int get_filenames_max(void)
0328 {
0329     int i=0;
0330     char filename_active[PATH_MAX+1];
0331     char filename_prepared[PATH_MAX+1];
0332     
0333     while(1)
0334     {
0335         snprintf(filename_active, sizeof(filename_active), "%s/%s-%03d", 
0336                 M_folder_active, G_atmi_tls->qdisk_tls->filename_base, i+1);
0337         snprintf(filename_prepared, sizeof(filename_prepared), "%s/%s-%03d", 
0338                 M_folder_prepared, G_atmi_tls->qdisk_tls->filename_base, i+1);
0339         NDRX_LOG(log_debug, "Testing act: [%s] prep: [%s]", filename_active, 
0340                 filename_prepared);
0341         if (ndrx_file_exists(filename_active) || 
0342                 ndrx_file_exists(filename_prepared))
0343         {
0344             i++;
0345         }
0346         else
0347         {
0348             break;
0349         }
0350     }
0351     
0352     NDRX_LOG(log_info, "max file names %d", i);
0353     return i;
0354 }
0355 
0356 /**
0357  * Get the full file name for `i' occurrence 
0358  * @param i
0359  * @param folder
0360  * @return path to file
0361  */
0362 exprivate char *get_filename_i(int i, char *folder, int slot)
0363 {
0364     static __thread char filename[2][PATH_MAX+1];
0365     
0366     snprintf(filename[slot], sizeof(filename[0]), "%s/%s-%03d", folder, 
0367         G_atmi_tls->qdisk_tls->filename_base, i);
0368     
0369     return filename[slot];
0370 }
0371 
0372 /**
0373  * Special Q file name
0374  * @param fname
0375  * @return 
0376  */
0377 exprivate char *get_file_name_final(char *fname)
0378 {
0379     static __thread char buf[PATH_MAX+1];
0380     
0381     snprintf(buf, sizeof(buf), "%s/%s", M_folder_committed, fname);
0382     NDRX_LOG(log_debug, "Filename built: %s", buf);
0383     
0384     return buf;
0385 }
0386 
0387 /**
0388  * Rename file from one folder to another...
0389  * @param xid
0390  * @param rmid
0391  * @param from_folder
0392  * @param to_folder
0393  * @return 
0394  */
0395 exprivate int file_move(int i, char *from_folder, char *to_folder)
0396 {
0397     int ret = EXSUCCEED;
0398     char *f;
0399     char *t;
0400     
0401 
0402     f = get_filename_i(i, from_folder, 0);
0403     t = get_filename_i(i, to_folder, 1);
0404         
0405     NDRX_LOG(log_info, "Rename [%s]->[%s]", 
0406                 f,t);
0407 
0408     /* ndrx_G_systest_lockloss -> IO fence for test */
0409     if (ndrx_G_systest_lockloss || EXSUCCEED!=rename(f, t))
0410     {
0411         NDRX_LOG(log_error, "Failed to rename [%s]->[%s]: %s", 
0412                 f,t,
0413                 strerror(errno));
0414         EXFAIL_OUT(ret);
0415     }
0416     
0417 out:
0418     return ret;
0419 }
0420 
0421 /**
0422  * Move the file to committed storage
0423  * @param from_filename source file name with path
0424  * @param to_filename_only dest only filename
0425  * @return final file name
0426  */
0427 exprivate char * file_move_final_names(char *from_filename, char *to_filename_only)
0428 {
0429     int ret = EXSUCCEED;
0430     
0431     char *to_filename = get_file_name_final(to_filename_only);
0432     
0433     NDRX_LOG(log_debug, "Rename [%s] -> [%s]", from_filename, to_filename);
0434 
0435     return to_filename;
0436 }
0437 
0438 
0439 /**
0440  * Send notification to tmqueue server so that we have finished this
0441  * particular message & we can unlock that for further processing
0442  * TODO: in case of rename if we get noent, then check the destination
0443  *  if the destination name exists, then assume that rename was fine.
0444  *  this is needed to avoid infinte loop when tlog have gone async with disk
0445  * @param p_hdr
0446  * @param fname1 filename 1 to unlink (priority 1 - after this message is unblocked)
0447  * @param fname2 filename 2 to unlink (priority 2)
0448  * @param fcmd file cmd, if U, then fname1 & 2 is both unlinks, if R, then f1 src name, f2 dest name
0449  * @param tcmd transaction command
0450  * @return EXSUCCEED/EXFAIL
0451  */
0452 exprivate int tmq_finalize_file(union tmq_upd_block *p_upd, char *fname1, 
0453         char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0454 {
0455     
0456     int ret = EXSUCCEED;
0457     BFLDOCC occ;
0458     char name1[PATH_MAX+1]="";
0459     char name2[PATH_MAX+1]="";
0460     char *p;
0461     char *files[2];
0462     
0463     /* Load args... */
0464     if (NULL!=fname1)
0465     {
0466         NDRX_STRCPY_SAFE(name1, fname1);
0467         files[0]=name1;
0468     }
0469     else
0470     {
0471         files[0]=NULL;
0472     }
0473     
0474     if (NULL!=fname2)
0475     {
0476         NDRX_STRCPY_SAFE(name2, fname2);
0477         files[1]=name2;
0478     }
0479     else
0480     {
0481         files[1]=NULL;
0482     }    
0483     
0484     /* rename -> mandatory. Also if on remove file does not exists, this is the
0485      *  same as removed OK
0486      * first unlink -> mandatory
0487      * second unlink -> optional 
0488      */
0489     if (TMQ_FILECMD_UNLINK==fcmd)
0490     {            
0491         for (occ=0; occ<N_DIM(files) && NULL!=files[occ]; occ++)
0492         {
0493             NDRX_LOG(log_debug, "Unlinking file [%s]", files[occ]);
0494 
0495             /* IO fence test */
0496             if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(files[occ]))
0497             {
0498                 if (ENOENT!=errno)
0499                 {
0500                     int err = errno;
0501                     NDRX_LOG(log_error, "Failed to unlinking file [%s] occ %d: %s", 
0502                             files[occ], occ, strerror(err));
0503                     userlog("Failed to unlinking file [%s] occ %d: %s", 
0504                             files[occ], occ, strerror(err));
0505 
0506                     if (0==occ)
0507                     {
0508                         ret=XAER_RMERR;
0509                         goto out;
0510                     }
0511 
0512                 }
0513             }
0514 
0515             if (0==occ)
0516             {
0517                 /* get the folder form file name */
0518                 p=strrchr(files[occ], '/');
0519 
0520                 if (NULL!=p)
0521                 {
0522                     *p=EXEOS;
0523                 }
0524 
0525                 /* io fence test */
0526                 if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(files[occ], G_atmi_env.xa_fsync_flags))
0527                 {
0528                     NDRX_LOG(log_error, "Failed to dsync [%s]", files[occ]);
0529                     ret=XAER_RMERR;
0530                     goto out;
0531                 }
0532             }
0533         }
0534     }
0535     else if (TMQ_FILECMD_RENAME==fcmd)
0536     {
0537         if (NULL==files[0] || NULL==files[1])
0538         {
0539             NDRX_LOG(log_error, "File 1 or 2 is NULL %p %p - cannot rename",
0540                     files[0], files[1]);
0541             ret=XAER_RMERR;
0542             goto out;
0543         }
0544 
0545         NDRX_LOG(log_debug, "About to rename: [%s] -> [%s]",
0546                 name1, name2);
0547 
0548         /* wth io fence test: */
0549         if (ndrx_G_systest_lockloss|| EXSUCCEED!=rename(name1, name2))
0550         {
0551             int err = errno;
0552             
0553             if (ENOENT==err && ndrx_file_exists(name2))
0554             {
0555                 NDRX_LOG(log_error, "Failed to rename file [%s] -> [%s] occ %d: "
0556                         "%s, but dest exists - assume retry", 
0557                         name1, name2, occ, strerror(err));
0558             }
0559             else
0560             {
0561                 NDRX_LOG(log_error, "Failed to rename file [%s] -> [%s] occ %d: %s", 
0562                         name1, name2, occ, strerror(err));
0563                 userlog("Failed to rename file [%s] -> [%s] occ %d: %s", 
0564                         name1, name2, occ, strerror(err));
0565                 ret=XAER_RMERR;
0566                 goto out;
0567             }
0568         }
0569 
0570         /* get the folder form file name */
0571         p=strrchr(name2, '/');
0572 
0573         if (NULL!=p)
0574         {
0575             *p=EXEOS;
0576         }
0577 
0578         /* write io fence */
0579         if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(name2, G_atmi_env.xa_fsync_flags))
0580         {
0581             NDRX_LOG(log_error, "Failed to dsync [%s]", name2);
0582             ret=XAER_RMERR;
0583             goto out;
0584         }
0585     }
0586     else
0587     {
0588         NDRX_LOG(log_error, "Unsupported file command %c", fcmd);
0589         ret=XAER_RMERR;
0590         goto out;
0591     }
0592     
0593     /* If all OK, lets unlock the message. */
0594     if (!tcmd->no_unlock && EXSUCCEED!=M_p_tmq_unlock_msg(p_upd))
0595     {
0596         ret=XAER_RMERR;
0597         goto out;
0598     }
0599 
0600 out:
0601     return ret;
0602 }
0603 
0604 /**
0605  * Finalize files by update block
0606  * @param p_upd
0607  * @param fname1 filename1 to unlink
0608  * @param fname2 filename2 to unlink
0609  * @param fcmd file cmd
0610  * @param tcmd transaction command
0611  * @return EXSUCCEED/EXFAIL
0612  */
0613 exprivate int tmq_finalize_files_upd(tmq_msg_upd_t *p_upd, char *fname1, 
0614         char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0615 {
0616     union tmq_upd_block block;
0617     
0618     memset(&block, 0, sizeof(block));
0619     
0620     memcpy(&block.upd, p_upd, sizeof(*p_upd));
0621     
0622     return tmq_finalize_file(&block, fname1, fname2, fcmd, tcmd);
0623 }
0624 
0625 /**
0626  * Finalize files by header block
0627  * @param p_hdr
0628  * @param fname1 filename1 to unlink
0629  * @param fname2 filename2 to unlink
0630  * @param fcmd file commmand code
0631  * @param tcmd tran command
0632  * @return EXSUCCEED/EXFAIL
0633  */
0634 exprivate int tmq_finalize_files_hdr(tmq_cmdheader_t *p_hdr, char *fname1, 
0635         char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0636 {
0637     union tmq_upd_block block;
0638     
0639     memset(&block, 0, sizeof(block));
0640     memcpy(&block.hdr, p_hdr, sizeof(*p_hdr));
0641     
0642     return tmq_finalize_file(&block, fname1, fname2, fcmd, tcmd);
0643 }
0644 
0645 /**
0646  * Create required folders
0647  * @param xa_info root folder
0648  * @return EXSUCCEED/RM ERR
0649  */
0650 expublic int xa_open_entry_mkdir(char *xa_info)
0651 {
0652     int ret;
0653     /* The xa_info is directory, where to store the data...*/
0654     NDRX_STRNCPY(M_folder, xa_info, sizeof(M_folder)-2);
0655     M_folder[sizeof(M_folder)-1] = EXEOS;
0656     
0657     NDRX_LOG(log_info, "Q data directory: [%s]", xa_info);
0658     
0659     /* The xa_info is directory, where to store the data...*/
0660     NDRX_STRNCPY(M_folder_active, xa_info, sizeof(M_folder_active)-8);
0661     M_folder_active[sizeof(M_folder_active)-7] = EXEOS;
0662     NDRX_STRCAT_S(M_folder_active, sizeof(M_folder_active), "/active");
0663     
0664     NDRX_STRNCPY(M_folder_prepared, xa_info, sizeof(M_folder_prepared)-10);
0665     M_folder_prepared[sizeof(M_folder_prepared)-9] = EXEOS;
0666     NDRX_STRCAT_S(M_folder_prepared, sizeof(M_folder_prepared), "/prepared");
0667     
0668     NDRX_STRNCPY(M_folder_committed, xa_info, sizeof(M_folder_committed)-11);
0669     M_folder_committed[sizeof(M_folder_committed)-10] = EXEOS;
0670     NDRX_STRCAT_S(M_folder_committed, sizeof(M_folder_committed), "/committed");
0671     
0672     /* Test the directories */
0673     if (EXSUCCEED!=(ret=mkdir(M_folder, NDRX_DIR_PERM)) && ret!=EEXIST )
0674     {
0675         int err = errno;
0676 
0677         if (err!=EEXIST)
0678         {
0679             NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0680                 "[%s] - [%s]!", M_folder, strerror(err));
0681 
0682             userlog("xa_open_entry() Q driver: failed to create directory "
0683                     "[%s] - [%s]!", M_folder, strerror(err));
0684             return XAER_RMERR;
0685         }
0686         else
0687         {
0688             NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0689                 "[%s] - [%s]!", M_folder, strerror(err));
0690         }
0691     }
0692     
0693     if (EXSUCCEED!=(ret=mkdir(M_folder_active, NDRX_DIR_PERM)) && ret!=EEXIST )
0694     {
0695         int err = errno;
0696         
0697         if (err!=EEXIST)
0698         {
0699             NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0700                 "[%s] - [%s]!", M_folder_active, strerror(err));
0701 
0702             userlog("xa_open_entry() Q driver: failed to create directory "
0703                     "[%s] - [%s]!", M_folder_active, strerror(err));
0704             return XAER_RMERR;
0705         }
0706         else
0707         {
0708             NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0709                 "[%s] - [%s]!", M_folder_active, strerror(err));
0710         }
0711     }
0712     
0713     if (EXSUCCEED!=(ret=mkdir(M_folder_prepared, NDRX_DIR_PERM)) && ret!=EEXIST )
0714     {
0715         int err = errno;
0716         
0717         if (err!=EEXIST)
0718         {
0719             NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0720                 "[%s] - [%s]!", M_folder_prepared, strerror(err));
0721             userlog("xa_open_entry() Q driver: failed to create directory "
0722                     "[%s] - [%s]!", M_folder_prepared, strerror(err));
0723             return XAER_RMERR;
0724         }
0725         else
0726         {
0727             NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0728                 "[%s] - [%s]!", M_folder_prepared, strerror(err));
0729         }
0730     }
0731     
0732     if (EXSUCCEED!=(ret=mkdir(M_folder_committed, NDRX_DIR_PERM)) && ret!=EEXIST )
0733     {
0734         int err = errno;
0735         
0736         if (err!=EEXIST)
0737         {
0738             NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0739                 "[%s] - [%s]!", M_folder_committed, strerror(err));
0740             userlog("xa_open_entry() Q driver: failed to create directory "
0741                     "[%s] - [%s]!", M_folder_committed, strerror(err));
0742             return XAER_RMERR;
0743         }
0744         else
0745         {
0746             NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0747                 "[%s] - [%s]!", M_folder_committed, strerror(err));
0748         }
0749     }
0750     
0751     NDRX_LOG(log_info, "Prepared M_folder=[%s]", M_folder);
0752     NDRX_LOG(log_info, "Prepared M_folder_active=[%s]", M_folder_active);
0753     NDRX_LOG(log_info, "Prepared M_folder_prepared=[%s]", M_folder_prepared);
0754     NDRX_LOG(log_info, "Prepared M_folder_committed=[%s]", M_folder_committed);
0755     
0756     
0757     return XA_OK;
0758 }
0759 
0760 /**
0761  * Open API.
0762  * Now keeps the settings of the queue space too.
0763  * @param sw Current switch
0764  * @param xa_info New format: dir="/path_to_dir",qspace='SAMPLESPACE' (escaped)
0765  * @param rmid
0766  * @param flags
0767  * @return XA_ return codes
0768  */
0769 expublic int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0770 {
0771     int ret = XA_OK, err, i;
0772     static int first = EXTRUE;
0773     char *info_tmp = NULL;
0774     char *p, *val;
0775     
0776     /* mark that suspend no required by this resource... */
0777     if (first)
0778     {
0779         MUTEX_LOCK_V(M_init);
0780         if (first)
0781         {
0782             ndrx_xa_nosuspend(EXTRUE);
0783             first=EXFALSE;
0784         }
0785         MUTEX_UNLOCK_V(M_init);
0786     }
0787     
0788     if (G_atmi_tls->qdisk_is_open)
0789     {
0790         NDRX_LOG(log_warn, "xa_open_entry() - already open!");
0791         return XA_OK;
0792     }
0793     
0794     G_atmi_tls->qdisk_tls=NDRX_FPMALLOC(sizeof(ndrx_qdisk_tls_t), 0);
0795             
0796     if (NULL==G_atmi_tls->qdisk_tls)
0797     {
0798         int err=errno;
0799         
0800         NDRX_LOG(log_warn, "xa_open_entry() - failed to malloc TLS data: %s",
0801                 strerror(err));
0802         return XAER_RMERR;
0803     }
0804 
0805     G_atmi_tls->qdisk_tls->is_reg=EXFALSE;
0806     G_atmi_tls->qdisk_tls->filename_base[0]=EXEOS;
0807     G_atmi_tls->qdisk_tls->filename_active[0]=EXEOS;
0808     G_atmi_tls->qdisk_tls->filename_prepared[0]=EXEOS;
0809 
0810     G_atmi_tls->qdisk_tls->recover_namelist = NULL;
0811     G_atmi_tls->qdisk_tls->recover_open=EXFALSE;
0812     G_atmi_tls->qdisk_tls->recover_i=EXFAIL;
0813     G_atmi_tls->qdisk_tls->recover_last_loaded=EXFALSE;
0814             
0815     G_atmi_tls->qdisk_is_open = EXTRUE;
0816     G_atmi_tls->qdisk_rmid = rmid;
0817     
0818 #define UNLOCK_OUT MUTEX_UNLOCK_V(M_folder_lock);\
0819                 ret=XAER_RMERR;\
0820                 goto out;
0821                 
0822     /* Load only once? */
0823     if (!M_folder_set)
0824     {
0825         /* LOCK & Check */
0826         MUTEX_LOCK_V(M_folder_lock);
0827         
0828         if (!M_folder_set)
0829         {
0830             
0831             info_tmp = NDRX_STRDUP(xa_info);
0832             
0833             if (NULL==info_tmp)
0834             {
0835                 err=errno;
0836                 NDRX_LOG(log_error, "Failed to strdup: %s", strerror(err));
0837                 userlog("Failed to strdup: %s", strerror(err));
0838                 UNLOCK_OUT;
0839             }
0840             
0841 #define ARGS_DELIM      ","
0842 #define ARGS_QUOTE      "'\""
0843 #define ARG_DIR         "datadir"
0844 #define ARG_QSPACE      "qspace"
0845             
0846             /* preserve values in quotes... as atomic values */
0847             for (p = ndrx_strtokblk ( info_tmp, ARGS_DELIM, ARGS_QUOTE), i=0; 
0848                     NULL!=p;
0849                     p = ndrx_strtokblk (NULL, ARGS_DELIM, ARGS_QUOTE), i++)
0850             {
0851                 if (NULL!=(val = strchr(p, '=')))
0852                 {
0853                     *val = EXEOS;
0854                     val++;
0855                 }
0856                 
0857                 /* set data dir. */
0858                 if (0==strcmp(ARG_DIR, p))
0859                 {
0860                     /* Do parse of the string... */
0861                     ret=xa_open_entry_mkdir(val);
0862                     
0863                     if (EXSUCCEED!=ret)
0864                     {
0865                         NDRX_LOG(log_error, "Failed to prepare data directory [%s]", val);
0866                         UNLOCK_OUT;
0867                     }   
0868                 }
0869                 else if (0==strcmp(ARG_QSPACE, p))
0870                 {
0871                     NDRX_STRCPY_SAFE(ndrx_G_qspace, val);
0872                 }
0873                 /* all other ignored... */
0874             }
0875             
0876             if (EXEOS==ndrx_G_qspace[0])
0877             {
0878                 NDRX_LOG(log_error, "[%s] setting not found in open string!", ARG_QSPACE);
0879                 UNLOCK_OUT;
0880             }
0881             
0882             if (EXEOS==M_folder[0])
0883             {
0884                 NDRX_LOG(log_error, "[%s] setting not found in open string!", ARG_DIR);
0885                 UNLOCK_OUT;
0886             }
0887             
0888             snprintf(ndrx_G_qspacesvc, sizeof(ndrx_G_qspacesvc),
0889                     NDRX_SVC_QSPACE, ndrx_G_qspace);
0890 
0891             NDRX_LOG(log_debug, "Qspace set to: [%s]", ndrx_G_qspace);
0892             NDRX_LOG(log_debug, "Qspace svc set to: [%s]", ndrx_G_qspacesvc);
0893             M_folder_set=EXTRUE;
0894             
0895         }   
0896         MUTEX_UNLOCK_V(M_folder_lock);
0897         
0898     }
0899 out:
0900     
0901     if (NULL!=info_tmp)
0902     {
0903         NDRX_FREE(info_tmp);
0904     }
0905     return ret;
0906 }
0907 /**
0908  * Close entry
0909  * @param sw
0910  * @param xa_info
0911  * @param rmid
0912  * @param flags
0913  * @return 
0914  */
0915 expublic int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0916 {
0917     NDRX_LOG(log_info, "xa_close_entry() called");
0918     
0919     if (NULL!=G_atmi_tls->qdisk_tls)
0920     {
0921         NDRX_FPFREE(G_atmi_tls->qdisk_tls);
0922         G_atmi_tls->qdisk_tls=NULL;
0923     }
0924     
0925     G_atmi_tls->qdisk_is_open = EXFALSE;
0926     
0927     return XA_OK;
0928 }
0929 
0930 /**
0931  * TMQ queue internal version of transaction starting
0932  * @return XA error code
0933  */
0934 exprivate int xa_start_entry_tmq(char *tmxid, long flags)
0935 {
0936     int locke = EXFALSE;
0937     qtran_log_t * p_tl = NULL;
0938     int ret = XA_OK;
0939     
0940     set_filename_base_tmxid(tmxid);
0941     
0942     /* Firstly try to locate the tran */
0943     p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0944 
0945     if ( (flags & TMJOIN) || (flags & TMRESUME) )
0946     {
0947         if (NULL==p_tl && !locke)
0948         {
0949             NDRX_LOG(log_error, "Xid [%s] TMJOIN/TMRESUME but tran not found",
0950                     tmxid);
0951             ret = XAER_NOTA;
0952             goto out;
0953         }
0954 
0955         NDRX_LOG(log_info, "Xid [%s] join OK", tmxid);
0956     }
0957     else
0958     {
0959         /* this is new tran */
0960         if (NULL!=p_tl || locke)
0961         {
0962             NDRX_LOG(log_error, "Cannot start Xid [%s] already in progress",
0963                     tmxid);
0964             ret = XAER_DUPID;
0965             goto out;
0966         }
0967         else
0968         {
0969             
0970             if (EXSUCCEED!=tmq_log_start(tmxid))
0971             {
0972                 NDRX_LOG(log_error, "Failed to start transaction for tmxid [%s]",
0973                         tmxid);
0974                 ret = XAER_RMERR;
0975                 goto out;
0976             }
0977             NDRX_LOG(log_info, "Queue transaction Xid [%s] started OK", tmxid);
0978         }
0979     }
0980     
0981 out:
0982     
0983     if (NULL!=p_tl && !locke)
0984     {
0985         tmq_log_unlock(p_tl);
0986     }
0987 
0988     return ret;
0989 }
0990 
0991 /**
0992  * Minimal XA call to tmqueue, used by external processes.
0993  * @param tmxid serialized xid
0994  * @param cmd TMQ_CMD* command code
0995  * @return XA error code
0996  */
0997 exprivate int ndrx_xa_qminicall(char *tmxid, char cmd)
0998 {
0999     long rsplen;
1000     UBFH *p_ub = NULL;
1001     long ret = XA_OK;
1002     short nodeid = (short)tpgetnodeid();
1003    
1004     p_ub = (UBFH *)tpalloc("UBF", "", 1024 );
1005     
1006     if (NULL==p_ub)
1007     {
1008         NDRX_LOG(log_error, "Failed to allocate notif buffer");
1009         ret = XAER_RMERR;
1010         goto out;
1011     }
1012     
1013     if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
1014     {
1015         NDRX_LOG(log_error, "Failed to setup EX_QMSGID!");
1016         ret = XAER_RMERR;
1017         goto out;
1018     }
1019     
1020     if (EXSUCCEED!=Bchg(p_ub, TMXID, 0, tmxid, 0L))
1021     {
1022         NDRX_LOG(log_error, "Failed to setup TMXID!");
1023         ret = XAER_RMERR;
1024         goto out;
1025     }
1026 
1027     if (EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L))
1028     {
1029         NDRX_LOG(log_error, "Failed to setup TMNODEID!");
1030         ret = XAER_RMERR;
1031         goto out;
1032     }
1033     
1034     NDRX_LOG(log_info, "Calling QSPACE [%s] for tmxid [%s], command %c",
1035                 ndrx_G_qspacesvc, tmxid, cmd);
1036     
1037     ndrx_debug_dump_UBF(log_info, "calling Q space with", p_ub);
1038 
1039     if (EXFAIL == tpcall(ndrx_G_qspacesvc, (char *)p_ub, 0L, (char **)&p_ub, 
1040         &rsplen, TPNOTRAN))
1041     {
1042         NDRX_LOG(log_error, "%s failed: %s", ndrx_G_qspacesvc, tpstrerror(tperrno));
1043         
1044         /* best guess -> not available */
1045         ret = XAER_RMFAIL;
1046         
1047         /* anyway if have detailed response, use bellow. */
1048     }
1049     
1050     ndrx_debug_dump_UBF(log_info, "Reply from RM", p_ub);
1051     
1052     /* try to get the result of the OP */
1053     if (Bpres(p_ub, TMTXRMERRCODE, 0) &&
1054             EXSUCCEED!=Bget(p_ub, TMTXRMERRCODE, 0, (char *)&ret, 0L))
1055     {
1056         NDRX_LOG(log_debug, "Failed to get TMTXRMERRCODE: %s", Bstrerror(Berror));
1057         ret = XAER_RMERR;
1058     }
1059     
1060 out:
1061 
1062     if (NULL!=p_ub)
1063     {
1064         tpfree((char *)p_ub);
1065     }
1066 
1067     NDRX_LOG(log_info, "returns %d", ret);
1068     
1069     return ret;
1070 }
1071 
1072 /**
1073  * Serve the transaction call
1074  * @param p_ub UBF buffer
1075  * @param cmd tmq command code
1076  * @return XA error code
1077  */
1078 expublic int ndrx_xa_qminiservce(UBFH *p_ub, char cmd)
1079 {
1080     long ret = XA_OK;
1081     char tmxid[NDRX_XID_SERIAL_BUFSIZE+1];
1082     short nodeid, nodeid_loc;
1083     
1084     BFLDLEN len = sizeof(tmxid);
1085     
1086     /* 
1087      * ensure that we are recving requests only from our transaction
1088      * manager
1089      */
1090 
1091     if (EXSUCCEED!=Bget(p_ub, TMXID, 0, tmxid, &len))
1092     {
1093         NDRX_LOG(log_error, "Failed to get TMXID!");
1094         ret = XAER_INVAL;
1095         goto out;
1096     }
1097     
1098     if (EXSUCCEED!=Bget(p_ub, TMNODEID, 0, (char *)&nodeid, 0L))
1099     {
1100         NDRX_LOG(log_error, "Failed to get TMNODEID!");
1101         ret = XAER_INVAL;
1102         goto out;
1103     }
1104 
1105     if (G_atmi_env.procgrp_no && ndrx_sg_is_singleton(G_atmi_env.procgrp_no)
1106         /* and start transaction from any node */
1107         && TMQ_CMD_STARTTRAN!=cmd)
1108     {
1109         /* if we are in singleton group mode, ensure
1110          * that our nodeid matches with caller nodeid. To avoid any dead
1111          * communication with failed node.
1112          */
1113         nodeid_loc=tpgetnodeid();
1114         if (nodeid!=nodeid_loc)
1115         {
1116             NDRX_LOG(log_error, "ndrx_xa_qminiservce: singleton group tmqueue+tmsrv must be on "
1117             "the same node but our node is %hd rcvd call from %hd - XAER_RMFAIL",
1118             nodeid_loc, nodeid);
1119             userlog("ndrx_xa_qminiservce: singleton group tmqueue+tmsrv must be on "
1120             "the same node but our node is %hd rcvd call from %hd - XAER_RMFAIL",
1121             nodeid_loc, nodeid);
1122             ret = XAER_RMFAIL;
1123             goto out;
1124         }
1125     }
1126 
1127     switch (cmd)
1128     {
1129         case TMQ_CMD_STARTTRAN:
1130             ret = xa_start_entry_tmq(tmxid, 0);
1131             break;
1132         case TMQ_CMD_ABORTTRAN:
1133             ret = xa_rollback_entry_tmq(tmxid, 0);
1134             break;
1135         case TMQ_CMD_PREPARETRAN:
1136             ret = xa_prepare_entry_tmq(tmxid, 0);
1137             break;
1138         case TMQ_CMD_COMMITRAN:
1139             ret = xa_commit_entry_tmq(tmxid, 0);
1140             break;
1141             /* these two are not used: */
1142         case TMQ_CMD_CHK_MEMLOG:
1143         case TMQ_CMD_CHK_MEMLOG2:
1144 
1145             ret = tmq_log_exists_entry(tmxid);
1146 
1147             if (EXTRUE==ret)
1148             {
1149                 ret=XA_OK;
1150             }
1151             else
1152             {
1153                 ret=XAER_NOTA;
1154             }
1155 
1156             if (TMQ_CMD_CHK_MEMLOG2==cmd)
1157             {
1158                 NDRX_LOG(log_error, "Expected transaction to exist in "
1159                     "log [%s] but not found - restarting", tmxid);
1160                 userlog("Expected transaction to exist in "
1161                     "log [%s] but not found - restarting", tmxid);
1162                 M_p_tpexit();
1163             }
1164             
1165             break;
1166         default:
1167             NDRX_LOG(log_error, "Invalid command code [%c]", cmd);
1168             ret = XAER_INVAL;
1169             break;
1170     }
1171     
1172 out:
1173             
1174     NDRX_LOG(log_info, "returns XA status: %d", ret);
1175     
1176     if (EXSUCCEED!=Bchg(p_ub, TMTXRMERRCODE, 0, (char *)&ret, 0L))
1177     {
1178         NDRX_LOG(log_error, "Failed to setup TMTXRMERRCODE: %s", 
1179                 Bstrerror(Berror));
1180         ret = XAER_RMERR;
1181     }
1182 
1183     return ret;
1184 }
1185 
1186 
1187 /**
1188  * Set the file name of transaciton file (the base)
1189  * If exists and join - ok. Otherwise fail.
1190  * @param xa_info
1191  * @param rmid
1192  * @param flags
1193  * @return 
1194  */
1195 expublic int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1196 {
1197     char *tmxid;
1198     int ret = XA_OK;
1199     
1200     if (!G_atmi_tls->qdisk_is_open)
1201     {
1202         NDRX_LOG(log_error, "ERROR! xa_start_entry() - XA not open!");
1203         ret = XAER_RMERR;
1204         goto out;
1205     }
1206     
1207     tmxid = set_filename_base(xid);
1208     
1209     /* if we are tmq -> for join, perform lookup.
1210      * for start check, if tran exists -> error, if not exists, start
1211      * if doing from other process call the tmqueue for start/join (just chk)
1212      */
1213     
1214     if (M_is_tmqueue)
1215     {
1216         ret = xa_start_entry_tmq(tmxid, flags);
1217     }
1218     else if (! ( (flags & TMJOIN) || (flags & TMRESUME) ))
1219     {
1220         /* in case if no join from external process
1221          * request the tmsrv to start the transaction
1222          * also we are interested in return code
1223          */
1224         ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_STARTTRAN);
1225     }
1226     
1227 out:
1228 
1229     return ret;
1230 }
1231 
1232 /**
1233  * XA call end entry. Nothing special to do here.
1234  * @param sw
1235  * @param xid
1236  * @param rmid
1237  * @param flags
1238  * @return 
1239  */
1240 expublic int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1241 {
1242     if (!G_atmi_tls->qdisk_is_open)
1243     {
1244         NDRX_LOG(log_error, "ERROR! xa_end_entry() - XA not open!");
1245         return XAER_RMERR;
1246     }
1247     
1248     if (G_atmi_tls->qdisk_tls->is_reg)
1249     {
1250         if (EXSUCCEED!=ax_unreg(rmid, 0))
1251         {
1252             NDRX_LOG(log_error, "ERROR! xa_end_entry() - "
1253                     "ax_unreg() fail!");
1254             return XAER_RMERR;
1255         }
1256         
1257         G_atmi_tls->qdisk_tls->is_reg = EXFALSE;
1258     }
1259     
1260     /* no special processing for tran */
1261     
1262 out:
1263 
1264     return XA_OK;
1265 }
1266 
1267 /**
1268  * Internal TMQ version of rollback
1269  * @param tmxid xid serialized
1270  * @param flags any flags
1271  * @return  XA error code
1272  */
1273 exprivate int xa_rollback_entry_tmq(char *tmxid, long flags)
1274 {
1275     union tmq_upd_block b;
1276     char *fn = "xa_rollback_entry_tmq";
1277     qtran_log_cmd_t *el, *elt;
1278     int ret=XA_OK;
1279     
1280     int locke = EXFALSE;
1281     qtran_log_t * p_tl = NULL;
1282     
1283     if (!G_atmi_tls->qdisk_is_open)
1284     {
1285         NDRX_LOG(log_error, "ERROR! xa_rollback_entry() - XA not open!");
1286         return XAER_RMERR;
1287     }
1288     
1289     set_filename_base_tmxid(tmxid);
1290 
1291     /* Firstly try to locate the tran */
1292     p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1293     
1294     if (NULL==p_tl)
1295     {
1296         if (locke)
1297         {
1298             NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1299             return XAER_RMFAIL;
1300         }
1301         else
1302         {
1303             NDRX_LOG(log_info, "Q transaction [%s] does not exists", tmxid);
1304             /* return XAER_NOTA; */
1305 
1306             /* TODO: verify the disk... (only in case) sinelgrp ... */
1307             ret=tmq_check_prepared_exists_on_disk(tmxid);
1308 
1309             if (EXTRUE==ret)
1310             {
1311                 /* it really failure here. transaction must not exists
1312                  * on the disk if log does not exists.
1313                  * possible concurrent run.
1314                  * Restart now...
1315                  */
1316                NDRX_LOG(log_error, "(rollback) Integrity problem, transaction [%s] "
1317                     "exists on disk, but not in mem-log - restarting", tmxid);
1318                 userlog("(rollback) Integrity problem, transaction [%s] "
1319                     "exists on disk, but not in mem-log - restarting", tmxid);
1320 
1321                 M_p_tpexit();
1322                 return XAER_RMFAIL;
1323             }
1324             else if (EXFAIL==ret)
1325             {
1326                 return XAER_RMFAIL;
1327             }
1328             else
1329             {
1330                 return XAER_NOTA;
1331             }
1332         }
1333     }
1334     
1335     p_tl->txstage = XA_TX_STAGE_ABORTING;
1336     p_tl->is_abort_only=EXTRUE;
1337     
1338 #ifdef TXN_TRACE
1339     userlog("ABORT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1340     NDRX_LOG(log_error, "ABORT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1341 #endif
1342 
1343     /* Process files according to the log... */
1344     DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1345     {
1346         char *fname = NULL;
1347         
1348 #ifdef TXN_TRACE
1349         userlog("ABORT_ENT: tmxid=[%s] command_code=[%c]",
1350             tmxid, el->b.hdr.command_code);
1351         NDRX_LOG(log_error, "ABORT_ENT: tmxid=[%s] command_code=[%c]",
1352             tmxid, el->b.hdr.command_code);
1353 #endif
1354 
1355         if (XA_RM_STATUS_ACTIVE==el->cmd_status)
1356         {
1357             /* run on active folder */
1358             fname = get_filename_i(el->seqno, M_folder_active, 0);
1359         }
1360         else if (XA_RM_STATUS_PREP==el->cmd_status)
1361         {
1362             /* run on prepared folder */
1363             fname = get_filename_i(el->seqno, M_folder_prepared, 0);
1364         }
1365         else
1366         {
1367             NDRX_LOG(log_error, "Invalid QCMD status %c", el->cmd_status);
1368             userlog("Invalid QCMD status %c", el->cmd_status);
1369             continue;
1370         }
1371         
1372         memcpy(&b, &el->b, sizeof(b));
1373         
1374         /* Send the notification */
1375         if (TMQ_STORCMD_DUM == el->b.hdr.command_code)
1376         {
1377             /* nothing special here... */
1378         }
1379         else if (TMQ_STORCMD_NEWMSG == el->b.hdr.command_code)
1380         {
1381             NDRX_LOG(log_info, "%s: issuing delete command...", fn);
1382             b.hdr.command_code = TMQ_STORCMD_DEL;
1383         }
1384         else
1385         {
1386             NDRX_LOG(log_info, "%s: unlock command...", fn);
1387 
1388             b.hdr.command_code = TMQ_STORCMD_UNLOCK;
1389         }
1390 
1391         /* if tmq server is not working at this moment
1392          * then we cannot complete the rollback
1393          */
1394         if (EXSUCCEED!=tmq_finalize_files_hdr(&b.hdr, fname, 
1395                 NULL, TMQ_FILECMD_UNLINK, el))
1396         {
1397             NDRX_LOG(log_error, "Failed to unlink [%s]", fname);
1398             continue;
1399         }
1400         
1401         /* Finally remove command entry */
1402         DL_DELETE(p_tl->cmds, el);
1403         NDRX_FPFREE(el);
1404         NDRX_LOG(log_debug, "Abort [%s] OK", fname);
1405         
1406     }
1407     
1408     if (NULL!=p_tl->cmds)
1409     {
1410         NDRX_LOG(log_error, "Failed to abort Q transaction [%s] -> commands exists",
1411                 tmxid);
1412         return XAER_RMERR;
1413     }
1414     
1415     /* delete message from log */
1416     tmq_remove_logfree(p_tl, EXTRUE);
1417     
1418     return XA_OK;
1419 }
1420 
1421 /**
1422  * Process local or remote call
1423  * @return 
1424  */
1425 expublic int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1426 {
1427     char *tmxid;
1428     int ret = XA_OK;
1429     
1430     if (!G_atmi_tls->qdisk_is_open)
1431     {
1432         NDRX_LOG(log_error, "ERROR! xa_rollback_entry() - XA not open!");
1433         ret = XAER_RMERR;
1434         goto out;
1435     }
1436     
1437     tmxid = set_filename_base(xid);
1438     
1439     if (M_is_tmqueue)
1440     {
1441         ret = xa_rollback_entry_tmq(tmxid, flags);
1442     }
1443     else
1444     {
1445         ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_ABORTTRAN);
1446     }
1447     
1448 out:
1449 
1450     return ret;
1451 }
1452 
1453 /**
1454  * Prepare transactions (according to TL log)
1455  * @return XA status
1456  */
1457 exprivate int xa_prepare_entry_tmq(char *tmxid, long flags)
1458 {
1459     int locke = EXFALSE;
1460     qtran_log_t * p_tl = NULL;
1461     int ret = XA_OK;
1462     int did_move=EXFALSE;
1463     qtran_log_cmd_t *el, *elt;
1464     
1465     if (!G_atmi_tls->qdisk_is_open)
1466     {
1467         NDRX_LOG(log_error, "ERROR! xa_prepare_entry_tmq() - XA not open!");
1468         ret=XAER_RMERR;
1469         goto out;
1470     }
1471     
1472     set_filename_base_tmxid(tmxid);
1473     
1474     /* Firstly try to locate the tran */
1475     p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1476     
1477     if (NULL==p_tl)
1478     {
1479         if (locke)
1480         {
1481             NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1482             ret=XAER_RMFAIL;
1483             goto out;
1484         }
1485         else
1486         {
1487             NDRX_LOG(log_error, "Q transaction [%s] does not exists", tmxid);
1488             ret=XAER_NOTA;
1489             goto out;
1490         }
1491     }
1492     
1493     if (p_tl->is_abort_only)
1494     {
1495         NDRX_LOG(log_error, "Q transaction [%s] is abort only!", tmxid);
1496         ret = XAER_RMERR;
1497         goto out;
1498     }
1499     
1500     if (XA_TX_STAGE_ACTIVE!=p_tl->txstage)
1501     {
1502         NDRX_LOG(log_error, "Q transaction [%s] expected stage %hd (active) got %hd",
1503                 tmxid, XA_TX_STAGE_ACTIVE, p_tl->txstage);
1504         
1505         ret = XAER_RMERR;
1506         p_tl->is_abort_only=EXTRUE;
1507         goto out;
1508     }
1509 
1510     /* check lock & sequence */
1511     tmq_log_checkpointseq(p_tl);
1512     
1513     p_tl->txstage = XA_TX_STAGE_PREPARING;
1514     
1515     /* TODO:  
1516      * Read active records -> as normal. If file cannot be read, create dummy
1517      * record for given tran. And thus if having active record or prepared+active 
1518      * at startup will automatically mean that transaction is abort-only. 
1519      * 
1520      * At the startup, all abort-only transaction shall be aborted
1521      * 
1522      * If doing prepare with empty RO transaction, inject dummy command prior
1523      * (transaction marking) so that at restart we have transaction log and
1524      * thus we can respond commit OK, thought might be optional as TMSRV
1525      * prepared resources with XAER_NOTA transaction would assume that committed OK
1526      */
1527     
1528     if (NULL==p_tl->cmds)
1529     {
1530         /* do not write the file, as we will use existing on disk */
1531         if (EXSUCCEED!=M_p_tmq_dum_add(p_tl->tmxid))
1532         {
1533             ret = XAER_RMERR;
1534             p_tl->is_abort_only=EXTRUE;
1535             goto out;
1536         }
1537         
1538         NDRX_LOG(log_debug, "Dummy marker added");
1539         
1540     }
1541     
1542     /* process command by command to stage to prepared ... */
1543     DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1544     {
1545         if (EXSUCCEED!=file_move(el->seqno, M_folder_active, M_folder_prepared))
1546         {
1547             NDRX_LOG(log_error, "Q tran tmxid [%s] seq %d failed to prepare (file move)",
1548                     tmxid, el->seqno);
1549             p_tl->is_abort_only=EXTRUE;
1550             ret=XAER_RMERR;
1551             goto out;
1552         }
1553         
1554         el->cmd_status = XA_RM_STATUS_PREP;
1555         NDRX_LOG(log_info, "tmxid [%s] seq %d prepared OK", tmxid, el->seqno);
1556         did_move=EXTRUE;
1557     }
1558     
1559     if (did_move)
1560     {
1561         /* sync the  */
1562     /* + write io fence */
1563         if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(M_folder_prepared, G_atmi_env.xa_fsync_flags))
1564         {
1565             NDRX_LOG(log_error, "Failed to dsync [%s]", M_folder_prepared);
1566             
1567             /* mark transaction for abort only! */
1568             p_tl->is_abort_only=EXTRUE;
1569             ret=XAER_RMERR;
1570             goto out;
1571         }
1572     }
1573     
1574     /* If all OK, switch transaction to prepared */
1575     p_tl->txstage = XA_TX_STAGE_PREPARED;
1576     
1577 out:
1578             
1579     if (NULL!=p_tl && !locke)
1580     {
1581         tmq_log_unlock(p_tl);
1582     }
1583 
1584     return ret;
1585 }
1586 
1587 /**
1588  * XA prepare entry call
1589  * Basically we move all messages for active to prepared folder (still named by
1590  * xid_str)
1591  * 
1592  * @return 
1593  */
1594 expublic int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1595 {
1596     char *tmxid;
1597     int ret = XA_OK;
1598     
1599     if (!G_atmi_tls->qdisk_is_open)
1600     {
1601         NDRX_LOG(log_error, "ERROR! xa_prepare_entry() - XA not open!");
1602         ret = XAER_RMERR;
1603         goto out;
1604     }
1605     
1606     tmxid = set_filename_base(xid);
1607     
1608     if (M_is_tmqueue)
1609     {
1610         ret = xa_prepare_entry_tmq(tmxid, flags);
1611     }
1612     else
1613     {
1614         ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_PREPARETRAN);
1615     }
1616     
1617 out:
1618 
1619     return ret;
1620 }
1621 
1622 /**
1623  * check the prepard transaction file status
1624  * @param tmxid transaction id
1625  * @return EXTRUE if exists, EXFAIL on error, EXFALSE(EXSUCCEED) no file exists
1626  */
1627 exprivate int tmq_check_prepared_exists_on_disk(char *tmxid)
1628 {
1629     char tmp[PATH_MAX+1];
1630     int i;
1631     int ret=EXSUCCEED;
1632     DIR *dir=NULL;
1633     struct dirent *entry;
1634     int len;
1635 
1636     dir = opendir(M_folder_prepared);
1637 
1638     if (dir == NULL) {
1639 
1640         NDRX_LOG(log_error, "opendir [%s] failed: %s", M_folder_prepared, strerror(errno));
1641         EXFAIL_OUT(ret);
1642     }
1643 
1644     snprintf(tmp, sizeof(tmp), "%s-", tmxid);
1645     len = strlen(tmp);
1646 
1647     while ((entry = readdir(dir)) != NULL)
1648     {
1649         if (0==strncmp(entry->d_name, tmp, len))
1650         {
1651             ret=EXTRUE;
1652             break;
1653         }
1654     }
1655 
1656 out:
1657     if (NULL!=dir)
1658     {
1659         closedir(dir);
1660     }
1661 
1662     return ret;
1663 }
1664 
1665 /**
1666  * Commit the transaction.
1667  * Move messages from prepared folder to files names with msgid
1668  * @param tmxid serialized branch xid
1669  * @param flags not used
1670  * @return XA error code
1671  */
1672 exprivate int xa_commit_entry_tmq(char *tmxid, long flags)
1673 {
1674     char msgid_str[TMMSGIDLEN_STR+1];
1675     FILE *f = NULL;
1676     char *fname;
1677     char *fname_msg;
1678     int err, tmq_err;
1679     int locke = EXFALSE;
1680     qtran_log_t * p_tl = NULL;
1681     int ret = XA_OK;
1682     qtran_log_cmd_t *el, *elt;
1683 
1684     if (!G_atmi_tls->qdisk_is_open)
1685     {
1686         NDRX_LOG(log_error, "ERROR! xa_commit_entry() - XA not open!");
1687         return XAER_RMERR;
1688     }
1689 
1690     set_filename_base_tmxid(tmxid);
1691     
1692     /* Firstly try to locate the tran */
1693     p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1694     
1695     if (NULL==p_tl)
1696     {
1697         if (locke)
1698         {
1699             NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1700             ret=XAER_RMFAIL;
1701             goto out;
1702         }
1703         else
1704         {
1705             NDRX_LOG(log_error, "Q transaction [%s] does not exists (in mem log)", tmxid);
1706             
1707             ret=tmq_check_prepared_exists_on_disk(tmxid);
1708 
1709             if (EXTRUE==ret)
1710             {
1711                 /* it really failure here. transaction must not exists
1712                  * on the disk if log does not exists.
1713                  * possible concurrent run.
1714                  * Restart now...
1715                  */
1716                NDRX_LOG(log_error, "(commit) Integrity problem, transaction [%s] "
1717                     "exists on disk, but not in mem-log - restarting", tmxid);
1718                 userlog("(commit) Integrity problem, transaction [%s] "
1719                     "exists on disk, but not in mem-log - restarting", tmxid);
1720 
1721                 ret=XAER_RMFAIL;
1722 
1723                 M_p_tpexit();
1724                 goto out;
1725             }
1726             else if (EXFAIL==ret)
1727             {
1728                 ret=XAER_RMFAIL;
1729                 goto out;
1730             }
1731             else
1732             {
1733                 ret=XAER_NOTA;
1734                 goto out;
1735             }
1736         }
1737     }
1738     
1739     if (p_tl->is_abort_only)
1740     {
1741         NDRX_LOG(log_error, "Q transaction [%s] is abort only!", tmxid);
1742         ret = XAER_RMERR;
1743         goto out;
1744     }
1745     
1746     /* allow to retry... */
1747     if (XA_TX_STAGE_PREPARED!=p_tl->txstage &&
1748         XA_TX_STAGE_COMMITTING!=p_tl->txstage)
1749     {
1750         NDRX_LOG(log_error, "Q transaction [%s] expected stage %hd (prepared) or %hd (committing) got %hd",
1751                 tmxid, XA_TX_STAGE_PREPARED, XA_TX_STAGE_COMMITTING, p_tl->txstage);
1752         
1753         ret = XAER_RMERR;
1754         p_tl->is_abort_only=EXTRUE;
1755         goto out;
1756     }
1757     
1758     p_tl->txstage = XA_TX_STAGE_COMMITTING;
1759     
1760     /* process command by command to stage to prepared ... */
1761 #ifdef TXN_TRACE
1762     /* Enable these commands (and for ABORT: too), to trace down
1763      * transaction loss. I.e. defective transaction is in case,
1764      * if we see in the log COMMIT: and ABORT:. There
1765      * shall be only one of these, not both
1766      *
1767      * To post-process, say run-suspend.sh of test 104:
1768      * extract the xids:
1769      * $ grep ":COMMIT:" ULOG.20231028  | cut -d '[' -f 2 | cut -d ']' -f1 > commit.xids
1770      *
1771      * verify that there aren't any aborts:
1772      * $ cat commit.xids | xargs -i grep {} ULOG.20231028 | grep --color=auto ABORT
1773      *
1774      */
1775     userlog("COMMIT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1776     NDRX_LOG(log_error, "COMMIT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1777 #endif
1778     DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1779     {
1780         /* run on prepared folder */
1781         fname = get_filename_i(el->seqno, M_folder_prepared, 0);
1782             
1783 #ifdef TXN_TRACE
1784         userlog("COMMIT_ENT: tmxid=[%s] command_code=[%c] fname=[%s]",
1785             tmxid, el->b.hdr.command_code, fname);
1786         NDRX_LOG(log_error, "COMMIT_ENT: tmxid=[%s] command_code=[%c] fname=[%s]",
1787             tmxid, el->b.hdr.command_code, fname);
1788 #endif
1789 
1790         /* Do the task... */
1791         if (TMQ_STORCMD_NEWMSG == el->b.hdr.command_code)
1792         {
1793              char *to_filename;
1794              
1795             /* also here  move shall be finalized by tmq
1796              * as if move fails, tmsrv could retry
1797              * and only after retry to unli
1798              */
1799              to_filename = file_move_final_names(fname, 
1800                     tmq_msgid_serialize(el->b.hdr.msgid, msgid_str));
1801             
1802             if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname,
1803                     to_filename, TMQ_FILECMD_RENAME, el))
1804             {
1805                 ret = XAER_RMFAIL;
1806                 goto out;
1807             }
1808         }
1809         else if (TMQ_STORCMD_UPD == el->b.hdr.command_code)
1810         {
1811             tmq_msg_t msg_to_upd; /* Message to update */
1812             int ret_len;
1813             
1814             fname_msg = get_file_name_final(tmq_msgid_serialize(el->b.hdr.msgid, 
1815                     msgid_str));
1816             NDRX_LOG(log_info, "Updating message file: [%s]", fname_msg);
1817             
1818             if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(fname_msg, "r+b")))
1819             {
1820                 int err = errno;
1821                 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to open file[%s]: %s!", 
1822                         fname_msg, strerror(err));
1823 
1824                 userlog( "ERROR! xa_commit_entry() - failed to open file[%s]: %s!", 
1825                         fname_msg, strerror(err));
1826                 ret = XAER_RMFAIL;
1827                 goto out;
1828             }
1829 
1830             ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
1831 
1832             if (EXFAIL==read_tx_block(f, (char *)&msg_to_upd, sizeof(msg_to_upd), 
1833                     fname_msg, "xa_commit_entry", &err, &tmq_err))
1834             {
1835                 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to read data block!");
1836                 ret = XAER_RMFAIL;
1837                 goto out;
1838             }
1839             
1840             /* seek the start */
1841             if (EXSUCCEED!=fseek (f, 0 , SEEK_SET ))
1842             {
1843                 NDRX_LOG(log_error, "Seekset failed: %s", strerror(errno));
1844                 ret = XAER_RMFAIL;
1845                 goto out;
1846             }
1847             
1848             UPD_MSG((&msg_to_upd), (&el->b.upd));
1849             
1850             /* Write th block */
1851         /* ndrx_G_systest_lockloss -> IO fence for testing */
1852             if (ndrx_G_systest_lockloss || sizeof(msg_to_upd)!=(ret_len=fwrite((char *)&msg_to_upd, 1, 
1853                     sizeof(msg_to_upd), f)))
1854             {
1855                 int err = errno;
1856                 NDRX_LOG(log_error, "ERROR! Failed to write to msg file [%s]: "
1857                         "req_len=%d, written=%d: %s", fname_msg,
1858                         sizeof(msg_to_upd), ret_len, strerror(err));
1859 
1860                 userlog("ERROR! Failed to write to msg file[%s]: req_len=%d, "
1861                         "written=%d: %s",
1862                         fname_msg, sizeof(msg_to_upd), ret_len, strerror(err));
1863 
1864                 ret = XAER_RMFAIL;
1865                 goto out;
1866             }
1867             NDRX_FCLOSE(f);
1868             f = NULL;
1869             
1870             /* remove the update file */
1871             NDRX_LOG(log_info, "Removing update command file: [%s]", fname);
1872             
1873             if (EXSUCCEED!=tmq_finalize_files_upd(&el->b.upd, fname, NULL,
1874                     TMQ_FILECMD_UNLINK, el))
1875             {
1876                 ret = XAER_RMFAIL;
1877                 goto out;
1878             }
1879             
1880         }
1881         else if (TMQ_STORCMD_DEL == el->b.hdr.command_code)
1882         {
1883             fname_msg = get_file_name_final(tmq_msgid_serialize(el->b.hdr.msgid, 
1884                     msgid_str));
1885             
1886             NDRX_LOG(log_info, "Message file to remove: [%s]", fname_msg);
1887             NDRX_LOG(log_info, "Command file to remove: [%s]", fname);
1888             
1889             /* Remove the message (it must be marked for delete)
1890              */
1891             if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname_msg, fname, 
1892                     TMQ_FILECMD_UNLINK, el))
1893             {
1894                 ret = XAER_RMFAIL;
1895                 goto out;
1896             }
1897         }
1898         else if (TMQ_STORCMD_DUM == el->b.hdr.command_code)
1899         {
1900             NDRX_LOG(log_info, "Dummy command to remove: [%s]", fname);
1901             
1902             /* Remove the message (it must be marked for delete)
1903              */
1904             if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname, NULL, 
1905                     TMQ_FILECMD_UNLINK, el))
1906             {
1907                 ret = XAER_RMFAIL;
1908                 goto out;
1909             }
1910         }
1911         else
1912         {
1913             NDRX_LOG(log_error, "ERROR! xa_commit_entry() - invalid command [%c]!",
1914                     el->b.hdr.command_code);
1915 
1916             ret = XAER_RMERR;
1917             goto out;
1918         }
1919         
1920         /* msg is unlocked, thus just remove ptr  */
1921         NDRX_LOG(log_debug, "Commit [%s] sequence OK", tmxid, el->seqno);
1922         DL_DELETE(p_tl->cmds, el);
1923         NDRX_FPFREE(el);
1924         
1925     }
1926     
1927     /* delete message from log */
1928     tmq_remove_logfree(p_tl, EXTRUE);
1929     p_tl = NULL;
1930     
1931     NDRX_LOG(log_info, "tmxid [%s] all sequences committed ok", tmxid);
1932     
1933 out:
1934 
1935     NDRX_LOG(log_info, "Commit returns %d", ret);
1936 
1937     /* unlock if tran is still alive */
1938     if (NULL!=p_tl && !locke)
1939     {
1940         tmq_log_unlock(p_tl);
1941     }
1942 
1943     return ret;
1944 }
1945 
1946 
1947 /**
1948  * Local or remove transaction commit
1949  * @return XA error 
1950  */
1951 expublic int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1952 {
1953     char *tmxid;
1954     int ret = XA_OK;
1955     
1956     if (!G_atmi_tls->qdisk_is_open)
1957     {
1958         NDRX_LOG(log_error, "ERROR! xa_commit_entry() - XA not open!");
1959         ret = XAER_RMERR;
1960         goto out;
1961     }
1962     
1963     tmxid = set_filename_base(xid);
1964     
1965     if (M_is_tmqueue)
1966     {
1967         ret = xa_commit_entry_tmq(tmxid, flags);
1968     }
1969     else
1970     {
1971         ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_COMMITRAN);
1972     }
1973     
1974 out:
1975 
1976     return ret;
1977 }
1978 
1979 #define READ_BLOCK(BUF, LEN) do {\
1980         if (LEN!=(act_read=fread(BUF, 1, LEN, f)))\
1981         {\
1982             *err = ferror(f);\
1983             \
1984             if (EXSUCCEED==*err)\
1985             {\
1986                 *tmq_err = TMQ_ERR_EOF;\
1987             }\
1988             NDRX_LOG(log_error, "ERROR! Failed to read tx file (%s: %s): req_read=%d, read=%d: %s, tmq_err: %d",\
1989                     dbg_msg, fname, LEN, act_read, (*err==0?"EOF":strerror(*err)), *tmq_err);\
1990             \
1991             userlog("ERROR! Failed to read tx file (%s: %s): req_read=%d, read=%d: %s, tmq_err: %d",\
1992                 dbg_msg, fname, LEN, act_read, (*err==0?"EOF":strerror(*err)), *tmq_err);\
1993             EXFAIL_OUT(ret);\
1994         }\
1995     } while (0)
1996 
1997 /** verify magic */
1998 #define VERIFY_MAGIC(FIELD, MAGIC_VAL, LEN, DESCR, ERROR_CODE) do { \
1999         /* validate magics and command code... */\
2000         if (0!=strncmp(FIELD, MAGIC_VAL, LEN))\
2001         {\
2002             NDRX_LOG(log_error, "ERROR! file: [%s] invalid %s: expected: [%s] got: [%.*s] command: [%x] error_code: %d", \
2003                     fname, DESCR, MAGIC_VAL, LEN, FIELD, (unsigned int)p_hdr->command_code, ERROR_CODE);\
2004             NDRX_DUMP(log_error, "Invalid header", p_hdr, sizeof(tmq_cmdheader_t));\
2005             userlog("ERROR! file: [%s] invalid %s: expected: [%s] got: [%.*s] command: [%x] error_code: %d", \
2006                     fname, DESCR, MAGIC_VAL, LEN, FIELD, (unsigned int)p_hdr->command_code, ERROR_CODE);\
2007             *tmq_err = ERROR_CODE;\
2008             EXFAIL_OUT(ret);\
2009         }\
2010     } while(0)
2011 
2012 /**
2013  * Reads the header block
2014  * Either full read or partial.
2015  * - Check header validity if ftell() position is 0.
2016  * @param block where to unload the data
2017  * @param p_len for TMQ_STORCMD_NEWMSG this indicates number of mandatory bytes to read
2018  *  for other message types, validate the read size against the actual structure size.
2019  * @param fname name for debug
2020  * @param dbg_msg debug message
2021  * @param err ptr to error code if failed to read
2022  * @param tmq_err internal error code
2023  * @return EXFAIL or number of bytes read
2024  */
2025 exprivate int read_tx_block(FILE *f, char *block, int len, char *fname, 
2026         char *dbg_msg, int *err, 
2027         int *tmq_err)
2028 {
2029     int act_read;
2030     int read = 0;
2031     int ret = EXSUCCEED;
2032     int offset = 0;
2033     tmq_cmdheader_t *p_hdr;
2034     
2035     *err=0;
2036     *tmq_err = 0;
2037     
2038     if (0==ftell(f))
2039     {
2040         assert(len >= sizeof(tmq_cmdheader_t));
2041         /* read header */
2042         READ_BLOCK(block, sizeof(tmq_cmdheader_t));
2043         
2044         read+=sizeof(tmq_cmdheader_t);
2045         
2046         /* validate header */
2047         p_hdr = (tmq_cmdheader_t *)block;
2048         
2049         /* validate magics and command code... */
2050         VERIFY_MAGIC(p_hdr->magic,  TMQ_MAGICBASE,  TMQ_MAGICBASE_LEN,  "base magic1",  TMQ_ERR_CORRUPT);
2051         VERIFY_MAGIC(p_hdr->magic2, TMQ_MAGICBASE2, TMQ_MAGICBASE_LEN,  "base magic2",  TMQ_ERR_CORRUPT);
2052         VERIFY_MAGIC(p_hdr->magic,  TMQ_MAGIC,      TMQ_MAGIC_LEN,      "magic1",       TMQ_ERR_VERSION);
2053         VERIFY_MAGIC(p_hdr->magic2, TMQ_MAGIC2,     TMQ_MAGIC_LEN,      "magic2",       TMQ_ERR_VERSION);
2054         
2055         /* validate command code */
2056         switch (p_hdr->command_code)
2057         {
2058             case TMQ_STORCMD_NEWMSG:
2059                 NDRX_LOG(log_debug, "Command: New message");
2060                 /* as this is biggest message & dynamic size: */
2061                 len = len - sizeof(tmq_cmdheader_t);
2062                 break;
2063             case TMQ_STORCMD_UPD:
2064                 
2065                 NDRX_LOG(log_debug, "Command: Update message");
2066                 if (len > sizeof(tmq_cmdheader_t))
2067                 {
2068                     len = sizeof(tmq_msg_upd_t) - sizeof(tmq_cmdheader_t);
2069                 }
2070                 break;
2071             case TMQ_STORCMD_DEL:
2072                 
2073                 NDRX_LOG(log_debug, "Command: Delete message");
2074                 
2075                 if (len > sizeof(tmq_cmdheader_t))
2076                 {
2077                     len = sizeof(tmq_msg_del_t) - sizeof(tmq_cmdheader_t);
2078                 }
2079                 break;
2080             case TMQ_STORCMD_UNLOCK:
2081                 
2082                 NDRX_LOG(log_debug, "Command: Unlock message");
2083                 
2084                 if (len > sizeof(tmq_cmdheader_t))
2085                 {
2086                     len = sizeof(tmq_msg_unl_t) - sizeof(tmq_cmdheader_t);
2087                 }
2088                 break;
2089             case TMQ_STORCMD_DUM:
2090 
2091                 NDRX_LOG(log_debug, "Command: dummy message");
2092 
2093                 if (len > sizeof(tmq_cmdheader_t))
2094                 {
2095                     len = sizeof(tmq_msg_dum_t) - sizeof(tmq_cmdheader_t);
2096                 }
2097                 break;
2098             default:
2099                 NDRX_LOG(log_error, "ERROR! file [%s] invalid command code [%x]", 
2100                     fname, (unsigned int)p_hdr->command_code);
2101                 NDRX_DUMP(log_error, "Invalid header", p_hdr, sizeof(tmq_cmdheader_t));
2102                 userlog("ERROR! file [%s] invalid command code [%x]", 
2103                     fname, (unsigned int)p_hdr->command_code);
2104                 *tmq_err = TMQ_ERR_CORRUPT;
2105                 EXFAIL_OUT(ret);
2106 
2107                 break;
2108         }
2109         
2110         offset = sizeof(tmq_cmdheader_t);
2111     }
2112     
2113     if (len > 0)
2114     {
2115         READ_BLOCK((block+offset), len);
2116         
2117         read+=len;
2118     }
2119     
2120     
2121 out:
2122                 
2123     if (EXSUCCEED==ret)
2124     {
2125         return read;
2126     }
2127     else
2128     {    
2129         return ret;
2130     }
2131 }
2132 
2133 /**
2134  * Read the block file file
2135  * @param fname full path to file
2136  * @param block buffer where to store the data block
2137  * @param len length to read.
2138  * @param err error code
2139  * @param tmq_err internal error code
2140  * @return EXFAIL or number of bytes read
2141  */
2142 exprivate int read_tx_from_file(char *fname, char *block, int len, int *err,
2143         int *tmq_err)
2144 {
2145     int ret = EXSUCCEED;
2146     FILE *f = NULL;
2147     
2148     *err=0;
2149     if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(fname, "r+b")))
2150     {
2151         *err = errno;
2152         NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to open file[%s]: %s!", 
2153                 fname, strerror(*err));
2154 
2155         userlog( "ERROR! xa_commit_entry() - failed to open file[%s]: %s!", 
2156                 fname, strerror(*err));
2157         EXFAIL_OUT(ret);
2158     }
2159 
2160     ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2161 
2162     ret = read_tx_block(f, block, len, fname, "read_tx_from_file", err, tmq_err);
2163     
2164 out:
2165 
2166     if (NULL!=f)
2167     {
2168         NDRX_FCLOSE(f);
2169     }
2170     
2171     return ret;
2172 }
2173 
2174 /** Write th block */
2175 #define WRITE_TO_DISK(PTR, OFFSET, LEN) \
2176     ret_len = 0;\
2177     if (ndrx_G_systest_lockloss || G_atmi_env.test_qdisk_write_fail || LEN!=(ret_len=fwrite( ((char *)PTR) + OFFSET, 1, LEN, f)))\
2178     {\
2179         int err = errno;\
2180         /* For Q/A purposes - simulate no space error, if requested */\
2181         if (G_atmi_env.test_qdisk_write_fail)\
2182         {\
2183             NDRX_LOG(log_error, "test point: test_qdisk_write_fail TRUE");\
2184             err = ENOSPC;\
2185         }\
2186         NDRX_LOG(log_error, "ERROR! Failed to write to msgblock/tx file [%s]: "\
2187                             "offset=%d req_len=%d, written=%d: %s",\
2188                             G_atmi_tls->qdisk_tls->filename_active,\
2189                             OFFSET, LEN, ret_len, strerror(err));\
2190         userlog("ERROR! Failed to write msgblock/tx file [%s]: "\
2191                 "offset=%d req_len=%d, written=%d: %s",\
2192                 G_atmi_tls->qdisk_tls->filename_active,\
2193                 OFFSET, LEN, ret_len, strerror(err));\
2194         EXFAIL_OUT(ret);\
2195     }
2196 
2197 /** flush changes to disk  */
2198 #define WRITE_FLUSH \
2199     ret_len = 0;\
2200     if (ndrx_G_systest_lockloss || EXSUCCEED!=fflush(f))\
2201     {\
2202         int err = errno;\
2203         NDRX_LOG(log_error, "ERROR! fflush() on [%s] failed: %s", \
2204             G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2205         userlog("ERROR! fflush() on [%s] failed: %s", \
2206             G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2207         EXFAIL_OUT(ret);\
2208     }
2209 
2210 #define WRITE_REWIND \
2211         if (EXSUCCEED!=fseek(f, 0L, SEEK_SET))\
2212         {\
2213             int err = errno;\
2214             NDRX_LOG(log_error, "ERROR! fseek() rewind on [%s] failed: %s", \
2215                 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2216             userlog("ERROR! fseek() rewind on [%s] failed: %s", \
2217                 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2218             EXFAIL_OUT(ret);\
2219         }
2220 
2221 /**
2222  * Write data to transaction file.
2223  * This works only with new files (new transactions)
2224  * @param block
2225  * @param len
2226  * @param cust_tmxid custom tmxid, if not running in global tran
2227  * @param int_diag internal diagnostics, flags
2228  * @return SUCCEED/FAIL
2229  */
2230 exprivate int write_to_tx_file(char *block, int len, char *cust_tmxid, int *int_diag)
2231 {
2232     int ret = EXSUCCEED;
2233     XID xid;
2234     size_t ret_len;
2235     FILE *f = NULL;
2236     int ax_ret;
2237     char mode_str[16];
2238     tmq_cmdheader_t dum;
2239     int seqno;
2240     long xaflags=0;
2241     char *tmxid=NULL;
2242     
2243     NDRX_STRCPY_SAFE(mode_str, "wb");
2244     
2245     if (ndrx_get_G_atmi_env()->xa_sw->flags & TMREGISTER && !G_atmi_tls->qdisk_tls->is_reg)
2246     {
2247         ax_ret = ax_reg(G_atmi_tls->qdisk_rmid, &xid, 0);
2248                 
2249         if (TM_JOIN!=ax_ret && TM_OK!=ax_ret)
2250         {
2251             if (NULL!=int_diag)
2252             {
2253                 *int_diag|=TMQ_INT_DIAG_EJOIN;
2254             }
2255             
2256             NDRX_LOG(log_error, "ERROR! xa_reg() failed!");
2257             EXFAIL_OUT(ret);
2258         }
2259         
2260         if (TM_JOIN==ax_ret)
2261         {
2262             xaflags = TMJOIN;
2263         }
2264         
2265         /* done by ax_reg! */
2266         if (XA_OK!=xa_start_entry(ndrx_get_G_atmi_env()->xa_sw, &xid, G_atmi_tls->qdisk_rmid, xaflags))
2267         {
2268             
2269             if (NULL!=int_diag)
2270             {
2271                 *int_diag|=TMQ_INT_DIAG_EJOIN;
2272             }
2273             
2274             NDRX_LOG(log_error, "ERROR! xa_start_entry() failed!");
2275             EXFAIL_OUT(ret);
2276         }
2277         
2278         G_atmi_tls->qdisk_tls->is_reg = EXTRUE;
2279     }
2280     
2281     /* get the current transaction? 
2282      * If the same thread is locked, then no problem...
2283      */
2284     if (EXSUCCEED!=set_filenames(&seqno))
2285     {
2286         EXFAIL_OUT(ret);
2287     }
2288 
2289     if (NULL!=cust_tmxid)
2290     {
2291         tmxid = cust_tmxid;
2292     }
2293     else
2294     {
2295         /* part of global trn */
2296         tmxid = G_atmi_tls->qdisk_tls->filename_base;
2297     }
2298     
2299     
2300     /* Open file for write... */
2301     NDRX_LOG(log_info, "Writing command file: [%s] mode: [%s] (seqno: %d)", 
2302         G_atmi_tls->qdisk_tls->filename_active, mode_str, seqno);
2303     
2304     /* io fence test. */
2305     if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(G_atmi_tls->qdisk_tls->filename_active, mode_str)))
2306     {
2307         int err = errno;
2308         NDRX_LOG(log_error, "ERROR! write_to_tx_file() - failed to open file[%s]: %s!", 
2309                 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2310         
2311         userlog( "ERROR! write_to_tx_file() - failed to open file[%s]: %s!", 
2312                 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2313         EXFAIL_OUT(ret);
2314     }
2315 
2316     ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2317 
2318     /* single step write..., as temp files now we discard
2319      * no problem with we get temprorary files incomplete...
2320      */
2321     WRITE_TO_DISK(block, 0, len);
2322     
2323     WRITE_FLUSH;
2324     
2325     /* sync the file, if required so... 
2326      * file updates are optional..
2327      */
2328     if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_fsync(f, G_atmi_env.xa_fsync_flags))
2329     {
2330         NDRX_LOG(log_error, "failed to fsync");
2331         EXFAIL_OUT(ret);
2332     }
2333     
2334     /* 
2335      * If all OK, add command transaction log 
2336      */
2337     if (EXSUCCEED!=tmq_log_addcmd(tmxid, seqno, block, XA_RM_STATUS_ACTIVE))
2338     {
2339         NDRX_LOG(log_error, "Failed to add [%s] seqno %d to transaction log",
2340                 G_atmi_tls->qdisk_tls->filename_base, seqno);
2341         userlog("Failed to add [%s] seqno %d to transaction log",
2342                 G_atmi_tls->qdisk_tls->filename_base, seqno);
2343         EXFAIL_OUT(ret);
2344     }
2345     
2346 out:
2347     
2348     if (NULL!=f)
2349     {
2350         /* unlink if failed to write to the folder... */
2351         if (EXSUCCEED!=ret)
2352         {
2353             NDRX_LOG(log_error, "Unlink: [%s]", G_atmi_tls->qdisk_tls->filename_active);
2354 
2355             /* io fencing test */
2356             if (ndrx_G_systest_lockloss 
2357                 || EXSUCCEED!=unlink(G_atmi_tls->qdisk_tls->filename_active))
2358             {
2359                  int err = errno;
2360                  NDRX_LOG(log_error, "Failed to unlink [%s]: %s",
2361                     G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2362                  userlog("Failed to unlink [%s]: %s",
2363                     G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2364             }
2365         }
2366         
2367         NDRX_FCLOSE(f);
2368     }
2369 
2370     /* mark abort only, if hade issues with disk */
2371     if (EXSUCCEED!=ret && NULL!=tmxid)
2372     {
2373         tmq_log_set_abort_only(tmxid);
2374     }
2375 
2376     return ret;
2377 }
2378 
2379 /* COMMANDS:
2380  * - write qmessage
2381  * - write status/try update
2382  * - unlink the message
2383  * 
2384  * Read only commands:
2385  * - Query messages (find first & find next)
2386  */
2387 
2388 /**
2389  * Write the message data to TX file
2390  * @param msg message (the structure is projected on larger memory block to fit in the whole msg
2391  * @param int_diag internal diagnostics, flags
2392  * @return SUCCEED/FAIL
2393  */
2394 expublic int tmq_storage_write_cmd_newmsg(tmq_msg_t *msg, int *int_diag)
2395 {
2396     int ret = EXSUCCEED;
2397     char tmp[TMMSGIDLEN_STR+1];
2398     size_t len;
2399     /*
2400     uint64_t lockt =  msg->lockthreadid;
2401     
2402      do not want to lock be written out to files: 
2403     msg->lockthreadid = 0;
2404     NO NO NO !!!! We still need a lock!
2405     */
2406     
2407      /* detect the actual len... */
2408     len = tmq_get_block_len((char *)msg);
2409     
2410     NDRX_DUMP(log_debug, "Writing new message to disk", 
2411                 (char *)msg, len);
2412     
2413     if (EXSUCCEED!=write_to_tx_file((char *)msg, len, NULL, int_diag))
2414     {
2415         NDRX_LOG(log_error, "tmq_storage_write_cmd_newmsg() failed for msg %s", 
2416                 tmq_msgid_serialize(msg->hdr.msgid, tmp));
2417         EXFAIL_OUT(ret);
2418     }
2419     /*
2420     msg->lockthreadid = lockt;
2421      */
2422     
2423     NDRX_LOG(log_info, "Message [%s] written ok to active TX file", 
2424             tmq_msgid_serialize(msg->hdr.msgid, tmp));
2425     
2426 out:
2427 
2428     return ret;
2429 }
2430 
2431 /**
2432  * Get the data size from any block
2433  * @param data generic data block 
2434  * @return data size or 0 on error (should not happen anyway)
2435  */
2436 expublic size_t tmq_get_block_len(char *data)
2437 {
2438     size_t ret = 0;
2439     
2440     tmq_cmdheader_t *p_hdr = (tmq_cmdheader_t *)data;
2441     tmq_msg_t *p_msg;
2442     tmq_msg_del_t *p_del;
2443     tmq_msg_upd_t *p_upd;
2444     tmq_msg_unl_t *p_unl;
2445     tmq_msg_dum_t *p_dum;
2446     
2447     switch (p_hdr->command_code)
2448     {
2449         case TMQ_STORCMD_NEWMSG:
2450             p_msg = (tmq_msg_t *)data;
2451             ret = sizeof(*p_msg) + p_msg->len;
2452             break;
2453         case TMQ_STORCMD_UPD:
2454             ret = sizeof(*p_upd);
2455             break;
2456         case TMQ_STORCMD_DEL:
2457             ret = sizeof(*p_del);
2458             break;
2459         case TMQ_STORCMD_UNLOCK:
2460             ret = sizeof(*p_unl);
2461             break;
2462         case TMQ_STORCMD_DUM:
2463             ret = sizeof(*p_dum);
2464             break;
2465         default:
2466             NDRX_LOG(log_error, "Unknown command code: %c", p_hdr->command_code);
2467             break;
2468     }
2469     
2470     return ret;
2471 }
2472 
2473 /**
2474  * Delete/Update message block write
2475  * @param p_block ptr to union of commands
2476  * @param cust_tmxid custom transaction id (not part of global tran)
2477  * @param int_diag internal diagnostics, flags
2478  * @return SUCCEED/FAIL
2479  */
2480 expublic int tmq_storage_write_cmd_block(char *data, char *descr, char *cust_tmxid, int *int_diag)
2481 {
2482     int ret = EXSUCCEED;
2483     char msgid_str[TMMSGIDLEN_STR+1];
2484     size_t len;
2485     tmq_cmdheader_t *p_hdr = (tmq_cmdheader_t *)data;
2486     
2487     /* detect the actual len... */
2488     len = tmq_get_block_len((char *)data);
2489     
2490     NDRX_LOG(log_info, "Writing command block: %s msg [%s]", descr, 
2491             tmq_msgid_serialize(p_hdr->msgid, msgid_str));
2492     
2493     NDRX_DUMP(log_debug, "Writing command block to disk", 
2494                 (char *)data, len);
2495     
2496     if (EXSUCCEED!=write_to_tx_file((char *)data, len, cust_tmxid, int_diag))
2497     {
2498         NDRX_LOG(log_error, "tmq_storage_write_cmd_block() failed for msg %s", 
2499                 tmq_msgid_serialize(p_hdr->msgid, msgid_str));
2500         EXFAIL_OUT(ret);
2501     }
2502     
2503 out:
2504     return ret;
2505 
2506 }
2507 
2508 /**
2509  * Return msgid from filename
2510  * @param filename_in
2511  * @param msgid_out
2512  * @return SUCCEED/FAIL
2513  */
2514 exprivate int tmq_get_msgid_from_filename(char *filename_in, char *msgid_out)
2515 {
2516     int ret = EXSUCCEED;
2517     
2518     tmq_msgid_deserialize(filename_in, msgid_out);
2519     
2520 out:
2521     return ret;    
2522 }
2523 
2524 /** continue with dirent free */
2525 #define DIRENT_CONTINUE \
2526             NDRX_FREE(namelist[n]);\
2527             continue;\
2528 /**
2529  * Restore messages from storage device.
2530  * TODO: File naming include 03d so that multiple tasks per file sorts alphabetically.
2531  * Any active transactions get's aborted automatically.
2532  * @param process_block callback function to process the data block
2533  * @return SUCCEED/FAIL
2534  */
2535 expublic int tmq_storage_get_blocks(int (*process_block)(char *tmxid, 
2536         union tmq_block **p_block, int state, int seqno), short nodeid, short srvid)
2537 {
2538     int ret = EXSUCCEED;
2539     struct dirent **namelist = NULL;
2540     int n, seqno;
2541     int j;
2542     char *p;
2543     union tmq_block *p_block = NULL;
2544     FILE *f = NULL;
2545     char filename[PATH_MAX+1];
2546     char tmxid[PATH_MAX+1];
2547     int err, tmq_err;
2548     int read;
2549     int state;
2550     qtran_log_t *p_tl;
2551     
2552     /* prepared & active messages are all locked
2553      * Also if delete command is found in active, this means that committed
2554      * message is locked. 
2555      * In case if there was concurrent tmsrv operation, then it might move file
2556      * from active to prepare and we have finished with prepared, and start to
2557      * scan active. Thus we might not see this file. Thus to solve this issue,
2558      * the prepared folder is scanned twice. In the second time only markings
2559      * are put that message is busy.
2560      */
2561     char *folders[] = {M_folder_committed, M_folder_prepared, M_folder_active};
2562     short msg_nodeid, msg_srvid;
2563     char msgid[TMMSGIDLEN];
2564     
2565     if (!G_atmi_tls->qdisk_is_open)
2566     {
2567         NDRX_LOG(log_error, "ERROR! tmq_storage_get_blocks() - XA not open!");
2568         return XAER_RMERR;
2569     }
2570     
2571     for (j = 0; j < N_DIM(folders); j++)
2572     {
2573         
2574         switch (j)
2575         {
2576             case 0:
2577                 /* committed */
2578                 state=TMQ_TXSTATE_COMMITTED;
2579                 break;
2580             case 1:
2581             case 3:
2582                 /* prepared */
2583                 state=TMQ_TXSTATE_PREPARED;
2584                 break;
2585             case 2:
2586                 /* active */
2587                 state=TMQ_TXSTATE_ACTIVE;
2588                 break;
2589         }
2590         
2591         n = scandir(folders[j], &namelist, 0, alphasort);
2592         if (n < 0)
2593         {
2594            NDRX_LOG(log_error, "Failed to scan q directory [%s]: %s", 
2595                    folders[j], strerror(errno));
2596            EXFAIL_OUT(ret);
2597         }
2598         
2599         while (n--)
2600         {
2601             if (0==strcmp(namelist[n]->d_name, ".") || 
2602                 0==strcmp(namelist[n]->d_name, ".."))
2603             {
2604                 DIRENT_CONTINUE;
2605             }
2606 
2607             /* filter nodeid & serverid from the filename... */
2608             if (0==j) /*  For committed folder we can detect stuff from filename */
2609             {
2610                 /* early filter... */
2611                 if (EXSUCCEED!=tmq_get_msgid_from_filename(namelist[n]->d_name, msgid))
2612                 {
2613                     EXFAIL_OUT(ret);
2614                 }
2615                 
2616                 tmq_msgid_get_info(msgid, &msg_nodeid, &msg_srvid);
2617                 
2618                 NDRX_LOG(log_info, "our nodeid/srvid %hd/%hd msg: %hd/%hd",
2619                         nodeid, srvid, msg_nodeid, msg_srvid);
2620 
2621                 if (nodeid!=msg_nodeid || srvid!=msg_srvid)
2622                 {
2623                     NDRX_LOG(log_warn, "our nodeid/srvid %hd/%hd msg: %hd/%hd - IGNORE",
2624                         nodeid, srvid, msg_nodeid, msg_srvid);
2625                     DIRENT_CONTINUE;
2626                 }
2627             }
2628             
2629             snprintf(filename, sizeof(filename), "%s/%s", folders[j], 
2630                     namelist[n]->d_name);
2631             
2632             
2633             /* In case of active or prepared messages,
2634              * extract the sequence number and create transaction
2635              * Active messages we do not as there might be incomplete
2636              * contents and really what only matters is to rollback
2637              * abort only transactions at startup.
2638              */
2639             NDRX_LOG(log_warn, "Loading [%s]", filename);
2640             
2641             /* Read header */            
2642             if (NULL==(p_block = NDRX_MALLOC(sizeof(*p_block))))
2643             {
2644                 NDRX_LOG(log_error, "Failed to alloc [%s]: %s", 
2645                    filename, strerror(errno));
2646                 EXFAIL_OUT(ret);
2647             }
2648             
2649             /* read only if it is committed or prepared */
2650             if (j>0)
2651             {
2652                 NDRX_STRCPY_SAFE(tmxid, namelist[n]->d_name);
2653                 
2654                 p = strrchr(tmxid, '-');
2655                 
2656                 if (NULL==p)
2657                 {
2658                     NDRX_LOG(log_error, "Invalid file name [%s] missing dash!", 
2659                             filename);
2660                     userlog("Invalid file name [%s] missing dash!", 
2661                             filename);
2662                     NDRX_FREE((char *)p_block);
2663                     p_block=NULL;
2664                     DIRENT_CONTINUE;
2665                 }
2666                 
2667                 *p=EXEOS;
2668                 p++;
2669                 
2670                 seqno = atoi(p);
2671                 
2672                 /* get or create a transaction! */
2673                 p_tl = tmq_log_start_or_get(tmxid);
2674                 
2675                 if (NULL==p_tl)
2676                 {
2677                     NDRX_LOG(log_error, "Failed to get transaction object for [%s] seqno %d",
2678                             tmxid, seqno);
2679                     NDRX_FREE((char *)p_block);
2680                     p_block=NULL;
2681                     EXFAIL_OUT(ret);
2682                 }
2683                 
2684                 if (2==j)
2685                 {
2686                     /* mark transaction as abort only! */
2687                     p_tl->is_abort_only=EXTRUE;
2688                     p_tl->txstage=XA_TX_STAGE_ACTIVE;
2689                 }
2690                 else
2691                 {
2692                     p_tl->txstage=XA_TX_STAGE_PREPARED;
2693                 }
2694                 
2695                 /* unlock tran */
2696                 tmq_log_unlock(p_tl);
2697             }
2698 
2699             if (j<2)
2700             {
2701                 if (ndrx_G_systest_lockloss || NULL==(f=NDRX_FOPEN(filename, "rb")))
2702                 {
2703                     err = errno;
2704                     NDRX_LOG(log_error, "Failed to open for read [%s]: %s", 
2705                        filename, strerror(err));
2706                     userlog("Failed to open for read [%s]: %s", 
2707                        filename, strerror(err));
2708                     NDRX_FREE((char *)p_block);
2709                     p_block=NULL;
2710 
2711                     EXFAIL_OUT(ret);
2712                 }
2713 
2714                 ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2715 
2716                 /* here we read maximum header size.
2717                  * For smaller messages (fixed struct messages) all data is read
2718                  */
2719                 if (EXFAIL==(read=read_tx_block(f, (char *)p_block, sizeof(*p_block), 
2720                         filename, "tmq_storage_get_blocks", &err, &tmq_err)))
2721                 {
2722                     NDRX_LOG(log_error, "ERROR! Failed to read [%s] hdr (%d bytes) tmqerr: %d: %s - cannot start, resolve manually", 
2723                        filename, sizeof(*p_block), tmq_err, (err==0?"EOF":strerror(err)) );
2724                     userlog("ERROR! Failed to read [%s] hdr (%d bytes) tmqerr: %d: %s - cannot start, resolve manually", 
2725                        filename, sizeof(*p_block), tmq_err, (err==0?"EOF":strerror(err)) );
2726 
2727                     /* skip & continue with next */
2728                     NDRX_FCLOSE(f);
2729                     f=NULL;
2730 
2731                     NDRX_FREE((char *)p_block);
2732                     p_block = NULL;
2733 
2734                     EXFAIL_OUT(ret);
2735                 }
2736 
2737                 /* Late filter 
2738                  * Not sure what will happen if file will be processed/removed
2739                  * by other server if for example we boot up...read the folder
2740                  * but other server on same qspace/folder will remove the file
2741                  * So better use different folder for each server...!
2742                  */
2743                 if (nodeid!=p_block->hdr.nodeid || srvid!=p_block->hdr.srvid)
2744                 {
2745                     NDRX_LOG(log_warn, "our nodeid/srvid %hd/%hd msg: %hd/%hd - IGNORE",
2746                         nodeid, srvid, p_block->hdr.nodeid, p_block->hdr.srvid);
2747 
2748                     NDRX_FREE((char *)p_block);
2749                     p_block = NULL;
2750                     NDRX_FCLOSE(f);
2751                     f=NULL;
2752 
2753                     DIRENT_CONTINUE;
2754                 }
2755 
2756                 NDRX_DUMP(log_debug, "Got command block", p_block, read);
2757 
2758                 /* if it is message, the re-alloc  */
2759                 if (TMQ_STORCMD_NEWMSG==p_block->hdr.command_code)
2760                 {
2761                     int bytes_extra;
2762                     int bytes_to_read;
2763                     if (NULL==(p_block = NDRX_REALLOC(p_block, sizeof(tmq_msg_t) + p_block->msg.len)))
2764                     {
2765                         NDRX_LOG(log_error, "Failed to alloc [%d]: %s", 
2766                            (sizeof(tmq_msg_t) + p_block->msg.len), strerror(errno));
2767                         EXFAIL_OUT(ret);
2768                     }
2769                     /* Read some more */
2770                     /* Bug #178
2771                      * Under raspberry-pi looks like msg.msg is closer to the start than
2772                      * whole message, and problem is that two bytes gets lost or over
2773                      * written. Thus needs some kind of correction - advance the
2774                      * pointer to msg over the extra bytes we have read.
2775                      * Also needs correction against size to read.
2776                      * This assumes that "tmq_msg_t" is largest structure...
2777                      */
2778                     bytes_extra = sizeof(*p_block)-EXOFFSET(tmq_msg_t, msg);
2779                     bytes_to_read = p_block->msg.len - bytes_extra;
2780 
2781                     NDRX_LOG(log_info, "bytes_extra=%d bytes_to_read=%d", 
2782                             bytes_extra, bytes_to_read);
2783 
2784                     if (bytes_to_read > 0)
2785                     {
2786                         if (EXFAIL==read_tx_block(f, 
2787                                 p_block->msg.msg+bytes_extra, 
2788                                 bytes_to_read, filename, "tmq_storage_get_blocks 2", &err, &tmq_err))
2789                         {
2790                             NDRX_LOG(log_error, "ERROR! Failed to read [%s] %d bytes: %s - cannot start, resolve manually", 
2791                                filename, bytes_to_read, strerror(err));
2792 
2793                             userlog("ERROR! Failed to read [%s] %d bytes: %s - cannot start, resolve manually", 
2794                                     filename, bytes_to_read, strerror(err));
2795                                             /* skip & continue with next */
2796                             NDRX_FCLOSE(f);
2797                             f=NULL;
2798                             NDRX_FREE((char *)p_block);
2799                             p_block = NULL;
2800 
2801                             EXFAIL_OUT(ret);
2802                         }
2803                     }
2804                     else
2805                     {
2806                         NDRX_LOG(log_info, "Full message already read by command block!");
2807                     }
2808 
2809                     /* any message not committed automatically means locked */
2810                     if (0!=j)
2811                     {
2812                         /* if message is active or prepared, then message is locked */
2813                         p_block->msg.lockthreadid = ndrx_gettid();
2814                     }
2815                     else
2816                     {
2817                         p_block->msg.lockthreadid = 0;
2818                     }
2819 
2820                     NDRX_DUMP(6, "Read message from disk", 
2821                             p_block->msg.msg, p_block->msg.len);
2822                 }
2823 
2824                 NDRX_FCLOSE(f);
2825                 f=NULL;
2826             }
2827             else
2828             {
2829                 /* just create dummy entry for active transactions */
2830                 M_p_tmq_setup_cmdheader_dum(&p_block->hdr, NULL, nodeid, 
2831                         0, ndrx_G_qspace, 0);
2832                 p_block->hdr.command_code = TMQ_STORCMD_DUM;
2833             }
2834             
2835             /* Process message block 
2836              * It is up to caller to free the mem & make null
2837              */
2838             if (EXSUCCEED!=process_block(tmxid, &p_block, state, seqno))
2839             {
2840                 NDRX_LOG(log_error, "Failed to process block!");
2841                 EXFAIL_OUT(ret);
2842             }
2843 
2844             NDRX_FREE(namelist[n]);
2845         }
2846         NDRX_FREE(namelist);
2847         namelist = NULL;
2848     }
2849     
2850 out:
2851 
2852     /* this will check for struct init or not init */
2853     if (NULL!=namelist)
2854     {
2855         dirent_free(namelist, n);
2856         namelist=NULL;
2857     }
2858 
2859     if (NULL!=p_block)
2860     {
2861         NDRX_FREE((char *)p_block);
2862     }
2863 
2864     /* close the resources */
2865     if (NULL!=f)
2866     {
2867         NDRX_FCLOSE(f);
2868     }
2869     return ret;
2870 }
2871 
2872 /**
2873  * Free up the current scan of the directory entries
2874  * used by recover
2875  */
2876 exprivate void dirent_free(struct dirent **namelist, int n)
2877 {
2878     while (n>=0)
2879     {
2880         NDRX_FREE(namelist[n]);
2881         n--;
2882     }
2883     NDRX_FREE(namelist);
2884 }
2885 
2886 /**
2887  * Verify that:
2888  * - Prepared must exist in the log. If it does not exist,
2889  *  then verify that file is removed from disk (some concurrent commit...)
2890  *  if file still exist, then we got a problem and basically we shall
2891  *  shutdown, as integrity is broken.
2892  * @param tmxid transaction id
2893  * @param fname file on disk
2894  */
2895 expublic int tmq_check_prepared(char *tmxid, char *fname)
2896 {
2897     int ret = EXSUCCEED;
2898     char tmp[PATH_MAX+1];
2899 
2900     /* Check entry by ndrx_xa_qminicall TMQ_CMD_CHK_MEMLOG */
2901 
2902     if (XA_OK!=(ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_CHK_MEMLOG)))
2903     {
2904         snprintf(tmp, sizeof(tmp), "%s/%s", M_folder_prepared, fname);
2905 
2906         if (ndrx_file_exists(tmp))
2907         {
2908             /* Request the service to restart if log is missing (it will) */
2909             ndrx_xa_qminicall(tmxid, TMQ_CMD_CHK_MEMLOG2);
2910 
2911             NDRX_LOG(log_error, "Storage integrity problem. File [%s] exists, "
2912                 "but transaction not - XAER_RMFAIL (tmqueue server will reboot)!", 
2913                     tmp);
2914             userlog("Integrity problem. File [%s] exists, "
2915                 "but transaction not - XAER_RMFAIL (tmqueue server will reboot)!", 
2916                     tmp);
2917 
2918             EXFAIL_OUT(ret);
2919         }
2920         else
2921         {
2922             NDRX_LOG(log_debug, "File [%s] does not exists any more", tmp);
2923         }
2924     }
2925 out:
2926     return ret;
2927 }
2928 
2929 /** continue with dirent free */
2930 #define RECOVER_CONTINUE \
2931         NDRX_FREE(G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]);\
2932             continue;\
2933    
2934 /** Close cursor */
2935 #define RECOVER_CLOSE_CURSOR \
2936         /* reset any stuff left open from previous scan... */\
2937         if (NULL!=G_atmi_tls->qdisk_tls->recover_namelist)\
2938         {\
2939             dirent_free(G_atmi_tls->qdisk_tls->recover_namelist, G_atmi_tls->qdisk_tls->recover_i);\
2940             G_atmi_tls->qdisk_tls->recover_namelist = NULL;\
2941         }\
2942         G_atmi_tls->qdisk_tls->recover_open=EXFALSE;\
2943         G_atmi_tls->qdisk_tls->recover_i=EXFAIL;\
2944         G_atmi_tls->qdisk_tls->recover_last_loaded=EXFALSE;\
2945 
2946 /**
2947  * Lists currently prepared transactions
2948  * NOTE! Currently messages does not store RMID.
2949  * This means that one directory cannot shared between several 
2950  * Might have race condition with in-progress prepare. But I think it is
2951  * is not a big problem. As anyway we normally will run abort or commit.
2952  * And commit will require that all is prepared.
2953  * @param sw XA switch
2954  * @param xid where to unload the xids
2955  * @param count positions avaialble
2956  * @param rmid RM id
2957  * @param flags
2958  * @return error or nr Xids recovered
2959  */
2960 expublic int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags)
2961 {
2962     int ret = XA_OK;
2963     int err;
2964     XID xtmp;
2965     char *p, *fname;
2966     char fname_full[PATH_MAX+1];
2967     int current_unload_pos=0; /* where to unload the stuff.. */
2968     
2969     if (!G_atmi_tls->qdisk_is_open)
2970     {
2971         NDRX_LOG(log_error, "ERROR! xa_recover_entry() - XA not open!");
2972         ret=XAER_PROTO;
2973         goto out;
2974     }
2975     
2976     if (NULL==xid && count >0)
2977     {
2978         NDRX_LOG(log_error, "ERROR: xid is NULL and count >0");
2979         ret=XAER_INVAL;
2980         goto out;
2981     }
2982     
2983     if (!G_atmi_tls->qdisk_tls->recover_open && ! (flags & TMSTARTRSCAN))
2984     {
2985         NDRX_LOG(log_error, "ERROR: Scan not open and TMSTARTRSCAN not specified");
2986         ret=XAER_INVAL;
2987         goto out;
2988     }
2989     
2990     /* close the scan */
2991     if (flags & TMSTARTRSCAN)
2992     {
2993         /* if was not open, no problem.. */
2994         RECOVER_CLOSE_CURSOR;
2995     }
2996     
2997     /* start the scan if requested so ...  */
2998     if (flags & TMSTARTRSCAN)
2999     {
3000         G_atmi_tls->qdisk_tls->recover_i = scandir(M_folder_prepared, 
3001                 &G_atmi_tls->qdisk_tls->recover_namelist, 0, alphasort);
3002         
3003         if (G_atmi_tls->qdisk_tls->recover_i < 0)
3004         {
3005             err=errno;
3006             NDRX_LOG(log_error, "Failed to scan q directory [%s]: %s", 
3007                     M_folder_prepared, strerror(err));
3008             userlog("Failed to scan q directory [%s]: %s", 
3009                     M_folder_prepared, strerror(err));
3010             ret=XAER_RMERR;
3011             goto out;
3012         }
3013         
3014         G_atmi_tls->qdisk_tls->recover_open=EXTRUE;
3015     }
3016     
3017     /* nothing to return */
3018     if (NULL==G_atmi_tls->qdisk_tls->recover_namelist)
3019     {
3020         ret=0;
3021         goto out;
3022     }
3023     
3024     /** start to unload xids, we got to match the same names */
3025     while ((count - current_unload_pos) > 0 && 
3026             G_atmi_tls->qdisk_tls->recover_i--)
3027     {
3028         fname = G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]->d_name;
3029         
3030         if (0==strcmp(fname, ".") || 
3031             0==strcmp(fname, ".."))
3032         {
3033             RECOVER_CONTINUE;
3034         }
3035         
3036         p = strchr(fname, '-');
3037         
3038         /* invalid file name, skip... */
3039         if (NULL==p)
3040         {
3041             NDRX_LOG(log_error, "Invalid prepared name [%s] - skip", fname);
3042             RECOVER_CONTINUE;
3043         }
3044                 
3045         NDRX_STRCPY_SAFE(fname_full, fname);
3046 
3047         /* terminate so that we have good name */
3048         *p = EXEOS;
3049         p++;
3050         
3051         if (NULL==atmi_xa_deserialize_xid((unsigned char *)fname, &xtmp))
3052         {
3053             NDRX_LOG(log_error, "Failed to deserialize xid: %s - skip", fname);
3054             RECOVER_CONTINUE;
3055         }
3056 
3057         /* TODO: if we will support several RMIDs in the same folder
3058          * then we need to read the file block and check the RMID
3059          * currently file blocks does not contain RMID. Thus that would
3060          * require to be appended.
3061          */
3062         
3063         /* check is it duplicate? */
3064         if ( (current_unload_pos>0 &&
3065                 0==memcmp(&xid[current_unload_pos-1], &xtmp, sizeof(XID)))
3066                 /* if it was processed in previous scan: 
3067                  */
3068                 || (G_atmi_tls->qdisk_tls->recover_last_loaded 
3069                         && 0==memcmp(&G_atmi_tls->qdisk_tls->recover_last, &xtmp, sizeof(XID)))
3070                 )
3071         {
3072             NDRX_LOG(log_debug, "Got part [%s] of xid [%s]", p, fname);
3073             RECOVER_CONTINUE;
3074         }
3075             
3076         /* Okey unload the xid finally */
3077         memcpy(&xid[current_unload_pos], &xtmp, sizeof(XID));
3078 
3079         /* ensure that have a log entry 
3080          * probably will not work... as all stuff must go through
3081          * the Qspace...?
3082          * however we can choose not to do this. As xa_rollback will bypass these and 
3083          * would be cleaned up at the next boot.
3084          * if going for commit, then tmq will reboot, as no tlog, but file exists on disk.
3085          */
3086 #if 0
3087         if (EXSUCCEED!=tmq_check_prepared(fname, fname_full))
3088         {
3089             /* having some issues with leaks: */
3090             NDRX_FREE(G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]);
3091             ret=XAER_RMFAIL;
3092             goto out;
3093         }
3094 #endif
3095         
3096         NDRX_LOG(log_debug, "Xid [%s] unload to position %d", fname, current_unload_pos);
3097         ret++;
3098         current_unload_pos++;
3099         RECOVER_CONTINUE;
3100     }
3101     
3102     if (ret>0)
3103     {
3104         /* save the last xid for reetry skipping.. */
3105         memcpy(&G_atmi_tls->qdisk_tls->recover_last, &xid[ret-1], sizeof(XID));
3106         G_atmi_tls->qdisk_tls->recover_last_loaded=EXTRUE;
3107     }
3108     
3109 out:
3110 
3111     /* terminate the scan */
3112     NDRX_LOG(log_debug, "recover: count=%ld, ret=%d", count, ret);
3113 
3114     if (    ret>=0 
3115             && ( (flags & TMENDRSCAN)  || ret < count)
3116             && G_atmi_tls->qdisk_tls->recover_open
3117             )
3118     {
3119         NDRX_LOG(log_debug, "recover: closing cursor");
3120         
3121         /* if was not open, no problem.. */
3122         RECOVER_CLOSE_CURSOR;
3123     }
3124 
3125     return ret; /* no transactions found */
3126 }
3127 
3128 /**
3129  * CURRENTLY NOT USED!!!
3130  * @param sw
3131  * @param xid
3132  * @param rmid
3133  * @param flags
3134  * @return 
3135  */
3136 expublic int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
3137 {
3138     
3139     if (!G_atmi_tls->qdisk_is_open)
3140     {
3141         NDRX_LOG(log_error, "ERROR! xa_forget_entry() - XA not open!");
3142         return XAER_RMERR;
3143     }
3144     
3145     NDRX_LOG(log_error, "ERROR! xa_forget_entry() - not using!!");
3146     return XAER_RMERR;
3147 }
3148 
3149 /**
3150  * CURRENTLY NOT USED!!!
3151  * @param sw
3152  * @param handle
3153  * @param retval
3154  * @param rmid
3155  * @param flags
3156  * @return 
3157  */
3158 expublic int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags)
3159 {
3160     if (!G_atmi_tls->qdisk_is_open)
3161     {
3162         NDRX_LOG(log_error, "ERROR! xa_complete_entry() - XA not open!");
3163         return XAER_RMERR;
3164     }
3165     
3166     NDRX_LOG(log_error, "ERROR! xa_complete_entry() - not using!!");
3167     return XAER_RMERR;
3168 }
3169 
3170 
3171 /* Static entries */
3172 expublic int xa_open_entry_stat( char *xa_info, int rmid, long flags)
3173 {
3174     return xa_open_entry(&ndrxqstatsw, xa_info, rmid, flags);
3175 }
3176 expublic int xa_close_entry_stat(char *xa_info, int rmid, long flags)
3177 {
3178     return xa_close_entry(&ndrxqstatsw, xa_info, rmid, flags);
3179 }
3180 expublic int xa_start_entry_stat(XID *xid, int rmid, long flags)
3181 {
3182     return xa_start_entry(&ndrxqstatsw, xid, rmid, flags);
3183 }
3184 expublic int xa_end_entry_stat(XID *xid, int rmid, long flags)
3185 {
3186     return xa_end_entry(&ndrxqstatsw, xid, rmid, flags);
3187 }
3188 expublic int xa_rollback_entry_stat(XID *xid, int rmid, long flags)
3189 {
3190     return xa_rollback_entry(&ndrxqstatsw, xid, rmid, flags);
3191 }
3192 expublic int xa_prepare_entry_stat(XID *xid, int rmid, long flags)
3193 {
3194     return xa_prepare_entry(&ndrxqstatsw, xid, rmid, flags);
3195 }
3196 expublic int xa_commit_entry_stat(XID *xid, int rmid, long flags)
3197 {
3198     return xa_commit_entry(&ndrxqstatsw, xid, rmid, flags);
3199 }
3200 expublic int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags)
3201 {
3202     return xa_recover_entry(&ndrxqstatsw, xid, count, rmid, flags);
3203 }
3204 expublic int xa_forget_entry_stat(XID *xid, int rmid, long flags)
3205 {
3206     return xa_forget_entry(&ndrxqstatsw, xid, rmid, flags);
3207 }
3208 expublic int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags)
3209 {
3210     return xa_complete_entry(&ndrxqstatsw, handle, retval, rmid, flags);
3211 }
3212 
3213 /* Dynamic entries */
3214 expublic int xa_open_entry_dyn( char *xa_info, int rmid, long flags)
3215 {
3216     return xa_open_entry(&ndrxqdynsw, xa_info, rmid, flags);
3217 }
3218 expublic int xa_close_entry_dyn(char *xa_info, int rmid, long flags)
3219 {
3220     return xa_close_entry(&ndrxqdynsw, xa_info, rmid, flags);
3221 }
3222 expublic int xa_start_entry_dyn(XID *xid, int rmid, long flags)
3223 {
3224     return xa_start_entry(&ndrxqdynsw, xid, rmid, flags);
3225 }
3226 expublic int xa_end_entry_dyn(XID *xid, int rmid, long flags)
3227 {
3228     return xa_end_entry(&ndrxqdynsw, xid, rmid, flags);
3229 }
3230 expublic int xa_rollback_entry_dyn(XID *xid, int rmid, long flags)
3231 {
3232     return xa_rollback_entry(&ndrxqdynsw, xid, rmid, flags);
3233 }
3234 expublic int xa_prepare_entry_dyn(XID *xid, int rmid, long flags)
3235 {
3236     return xa_prepare_entry(&ndrxqdynsw, xid, rmid, flags);
3237 }
3238 expublic int xa_commit_entry_dyn(XID *xid, int rmid, long flags)
3239 {
3240     return xa_commit_entry(&ndrxqdynsw, xid, rmid, flags);
3241 }
3242 expublic int xa_recover_entry_dyn(XID *xid, long count, int rmid, long flags)
3243 {
3244     return xa_recover_entry(&ndrxqdynsw, xid, count, rmid, flags);
3245 }
3246 expublic int xa_forget_entry_dyn(XID *xid, int rmid, long flags)
3247 {
3248     return xa_forget_entry(&ndrxqdynsw, xid, rmid, flags);
3249 }
3250 expublic int xa_complete_entry_dyn(int *handle, int *retval, int rmid, long flags)
3251 {
3252     return xa_complete_entry(&ndrxqdynsw, handle, retval, rmid, flags);
3253 }
3254 
3255 /**
3256  * Reset message check stopwatch
3257  */
3258 expublic void tmq_chkdisk_stopwatch_reset(void)
3259 {
3260     MUTEX_LOCK_V(M_chkdisk_stopwatch_lock);
3261     ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
3262     MUTEX_UNLOCK_V(M_chkdisk_stopwatch_lock);
3263     
3264 }
3265 
3266 /**
3267  * Get stopwatch delta in seconds
3268  * @return current stopwatch value in seconds
3269  */
3270 expublic long tmq_chkdisk_stopwatch_get_delta_sec(void)
3271 {
3272     long ret;
3273 
3274     MUTEX_LOCK_V(M_chkdisk_stopwatch_lock);
3275     ret=ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch);
3276     MUTEX_UNLOCK_V(M_chkdisk_stopwatch_lock);
3277 
3278     return ret;
3279 }
3280 
3281 
3282 /**
3283  * Verify that all committed messages on the disk, actually are present
3284  * in the memory...
3285  */
3286 exprivate void tmq_chkdisk_th(void *ptr, int *p_finish_off)
3287 {
3288     static int volatile into_func = EXFALSE;
3289     int *p_chkdisk_time = (int *)ptr;
3290 
3291     char tmp[PATH_MAX+1];
3292     int i;
3293     int ret=EXSUCCEED;
3294     DIR *dir=NULL;
3295     struct dirent *entry;
3296     int len;
3297     char tranmask[256];
3298 
3299     if (EXTRUE==into_func)
3300     {
3301         /* already in function, avoid duplicate run */
3302         return;
3303     }
3304 
3305     /* Check that any enqueue was in pool, but last call
3306      * finished and did reset
3307      */
3308     if ( tmq_chkdisk_stopwatch_get_delta_sec() <= *p_chkdisk_time)
3309     {
3310         /* no time from last run... */
3311         return;
3312     }
3313 
3314     dir = opendir(M_folder_committed);
3315 
3316     if (dir == NULL) {
3317 
3318         NDRX_LOG(log_error, "opendir [%s] failed: %s", 
3319             M_folder_committed, strerror(errno));
3320         EXFAIL_OUT(ret);
3321     }
3322 
3323     while ((entry = readdir(dir)) != NULL)
3324     {
3325         /* for performance reasons which check
3326          * any file (even if several Q spaces works on the same
3327          * directory). with -X flag it is mandatory that
3328          * any Q space have it's own directory of work
3329          * otherwise it will just regulary restart due to
3330          * unknown messages found. Message IDs does not encode
3331          * Q space name, thus cannot filter our file or not from the
3332          * filename.
3333          */
3334         if (entry->d_name[0] == '.')
3335         {
3336             continue;
3337         }
3338 
3339         /* try to lookup in message hash */
3340         if (!M_p_tmq_msgid_exists(entry->d_name))
3341         {
3342             snprintf(tmp, sizeof(tmp), "%s/%s", 
3343                 M_folder_committed, entry->d_name);
3344 
3345             if (ndrx_file_exists(tmp))
3346             {
3347                 NDRX_LOG(log_error, "ERROR: Unkown message file "
3348                         "exists [%s] (duplicate processes or two Q "
3349                         "spaces works in the same directory) - restarting...",
3350                     tmp);
3351                 userlog("ERROR: Unkown message file exists"
3352                         " [%s] (duplicate processes or two Q "
3353                         "spaces works in the same directory) - restarting...",
3354                     tmp);
3355                 M_p_tpexit();
3356                 break;
3357             }
3358         }
3359     }
3360 
3361 out:
3362     if (NULL!=dir)
3363     {
3364         closedir(dir);
3365     }
3366 
3367 }
3368 
3369 
3370 /* vim: set ts=4 sw=4 et smartindent: */