Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief tmrecover logic driver
0003  *
0004  * @file tmrecover.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <memory.h>
0038 #include <sys/param.h>
0039 
0040 #include <ndrstandard.h>
0041 #include <ndebug.h>
0042 #include <nstdutil.h>
0043 
0044 #include <ndrxdcmn.h>
0045 #include <atmi_int.h>
0046 #include <gencall.h>
0047 #include <utlist.h>
0048 #include <Exfields.h>
0049 #include <xa.h>
0050 #include <ubfutil.h>
0051 
0052 #include "xa_cmn.h"
0053 #include "exbase64.h"
0054 #include <nclopt.h>
0055 #include <exhash.h>
0056 /*---------------------------Externs------------------------------------*/
0057 /*---------------------------Macros-------------------------------------*/
0058 /*---------------------------Enums--------------------------------------*/
0059 /*---------------------------Typedefs-----------------------------------*/
0060 
0061 /**
0062  * transaction status hashmap
0063  */
0064 typedef struct {
0065     
0066     /** transaction base id */
0067     char xid_str[NDRX_XID_SERIAL_BUFSIZE];
0068     
0069     /** atmi status of the transaction */
0070     int atmi_err;
0071     
0072     EX_hash_handle hh; /**< makes this structure hashable               */
0073 } xid_status_t;
0074 
0075 /*---------------------------Globals------------------------------------*/
0076 /*---------------------------Statics------------------------------------*/
0077 exprivate long M_aborted;   /**< Number of aborted global transactions  */
0078 exprivate xid_status_t *M_trans=NULL; /** list of transactions          */
0079 
0080 /*---------------------------Prototypes---------------------------------*/
0081 
0082 /**
0083  * Free up the list of cached transaction results
0084  */
0085 exprivate void free_trans(void)
0086 {
0087     xid_status_t * el, *elt;
0088     
0089     EXHASH_ITER(hh, M_trans, el, elt)
0090     {
0091         EXHASH_DEL(M_trans, el);
0092         NDRX_FPFREE(el);
0093     }
0094 }
0095 
0096 /**
0097  * This returns cached ATMI result
0098  * Cache is needed due to fact that we might have several branches for the
0099  * transaction, and if transaction is alive, then no need to query TM again.
0100  * @param xid_str base xid (btid/rmid is 0)
0101  * @param rmid_start RMID which started the transactions
0102  * @param nodeid cluster node id which started the transaction
0103  * @param srvid server id which started the transaction
0104  * @return 0 - transaction is alive, atmi error code in case if not found  (TPEMATCH) / error
0105  */
0106 exprivate int get_xid_status(char *xid_str, unsigned char rmid_start, short nodeid, short srvid)
0107 {
0108     int ret = EXSUCCEED;
0109     xid_status_t *stat;
0110     char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0111     UBFH *p_ub = NULL;
0112     char cmd = ATMI_XA_STATUS;
0113     long rsplen;
0114     
0115     EXHASH_FIND_STR( M_trans, xid_str, stat);
0116     
0117     if (NULL!=stat)
0118     {
0119         NDRX_LOG(log_info, "transaction [%s] result cached: %d", xid_str, stat->atmi_err);
0120         ret = stat->atmi_err;
0121     }
0122     
0123     /* if not found, query the service */
0124     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM_I, (int)nodeid,  (int)rmid_start, (int)srvid);
0125     
0126     p_ub = (UBFH *)tpalloc("UBF", NULL, 1024);
0127     
0128     if (NULL==p_ub)
0129     {
0130         NDRX_LOG(log_error, "Failed to malloc UBF: %s", tpstrerror(tperrno));
0131         ret = tperrno;
0132         goto out;
0133         
0134     }
0135     
0136     /* set the xid field
0137      * set: TMCMD = ATMI_XA_STATUS:
0138      * set: TMXID = xid_str
0139      * out:
0140      * where is the error loaded?
0141      */
0142     if (EXSUCCEED!=Bchg(p_ub, TMCMD, 0, &cmd, 0L) 
0143             || EXSUCCEED!=Bchg(p_ub, TMXID, 0, xid_str, 0L))
0144     {
0145         NDRX_LOG(log_error, "Failed to set TMCMD or TMXID: %s", Bstrerror(Berror));
0146         ret = TPESYSTEM;
0147         goto out;
0148     }
0149     
0150     NDRX_LOG(log_debug, "Calling [%s] for transaction status", svcnm);
0151     
0152     if (EXSUCCEED!=tpcall(svcnm, (char *)p_ub, 0, (char **)&p_ub, &rsplen, 0))
0153     {
0154         NDRX_LOG(log_error, "Failed to call [%s]: %s", svcnm, tpstrerror(tperrno));
0155         
0156         /* save the current results.. */
0157         ret = tperrno;
0158         
0159         if (tperrno!=TPESVCFAIL)
0160         {
0161             /* nothing more to do here... */
0162             goto out;
0163         }
0164         
0165     }
0166 
0167     /* dump the response buffer... */
0168     ndrx_debug_dump_UBF(log_debug, "Response buffer:", p_ub);
0169     
0170     /* if got the error code back, read it... */
0171     if (Bpres(p_ub, TMERR_CODE, 0))
0172     {
0173         short sret;
0174         if (EXSUCCEED==Bget(p_ub, TMERR_CODE, 0, (char *)&sret, 0L))
0175         {
0176             ret=sret;
0177         }
0178     }
0179     
0180     /* OK got the final result: */
0181     NDRX_LOG(log_debug, "Transaction [%s] service [%s] reported status: %d - %s",
0182             xid_str, svcnm, ret, (TPEMATCH==ret?"tx not found":"tx found or error"));
0183     
0184     
0185     /* cache the result */
0186     stat = NDRX_FPMALLOC(sizeof(*stat), 0);
0187     
0188     if (NULL!=stat)
0189     {
0190         stat->atmi_err = ret;
0191         NDRX_STRCPY_SAFE(stat->xid_str, xid_str);
0192     }
0193     
0194     EXHASH_ADD_STR( M_trans, xid_str, stat);
0195     
0196 out:
0197     
0198     if (NULL!=p_ub)
0199     {
0200         tpfree((char *)p_ub);
0201     }
0202 
0203     return ret;
0204 }
0205 
0206 /**
0207  * Perform xid command on service
0208  * @param cmd ATMI_XA_ABORTLOCAL or ATMI_XA_FORGETLOCAL
0209  * @param recover_svcnm
0210  * @param tmxid
0211  * @return XA error code, so that we can decide is forget needed.
0212  */
0213 exprivate int tran_finalize(char cmd, char *recover_svcnm, char *tmxid)
0214 {
0215     short ret = EXSUCCEED;
0216     UBFH *p_ub = NULL;
0217     long rsplen, flags;
0218 
0219     p_ub = (UBFH *)tpalloc("UBF", NULL, 1024);
0220     if (NULL==p_ub)
0221     {
0222         NDRX_LOG(log_error, "Failed to malloc UBF: %s", tpstrerror(tperrno));
0223         ret = tperrno;
0224         goto out;
0225     }
0226     
0227     flags = TMFLAGS_NOCON;
0228     
0229     if (EXSUCCEED!=Bchg(p_ub, TMCMD, 0, &cmd, 0L) 
0230             || EXSUCCEED!=Bchg(p_ub, TMXID, 0, tmxid, 0L)
0231             || EXSUCCEED!=Bchg(p_ub, TMTXFLAGS, 0, (char *)&flags, 0L))
0232     {
0233         NDRX_LOG(log_error, "Failed to set TMCMD/TMXID/TMTXFLAGS: %s", Bstrerror(Berror));
0234         ret = XAER_RMERR;
0235         goto out;
0236     }
0237     
0238     NDRX_LOG(log_debug, "Calling command [%c] on [%s] for xid [%s]", cmd, recover_svcnm, tmxid);
0239     
0240     /* thought these only work on conv mode
0241      * here no conversion shall be established, just tpreturn
0242      * thus needs some kind of flag.
0243      */
0244     if (EXSUCCEED!=tpcall(recover_svcnm, (char *)p_ub, 0, (char **)&p_ub, &rsplen, 0))
0245     {
0246         NDRX_LOG(log_error, "Failed to call [%s]: %s", recover_svcnm, tpstrerror(tperrno));
0247         
0248         ret = XAER_RMERR;
0249         
0250         if (tperrno!=TPESVCFAIL)
0251         {
0252             /* nothing more to do here... */
0253             goto out;
0254         }
0255     }
0256     
0257     /* try to get xa reason
0258      * and override current status with it
0259      */
0260     if (Bpres(p_ub, TMERR_REASON, 0) && 
0261             EXSUCCEED!=Bget(p_ub, TMERR_REASON, 0, (char *)&ret, 0L))
0262     {
0263         NDRX_LOG(log_error, "Failed to get reason field: %s", Bstrerror(Berror));
0264         ret=XAER_RMERR;
0265     }
0266     
0267 out:
0268                 
0269     NDRX_LOG(log_debug, "Returning %d", ret);
0270     return ret;
0271 }
0272 
0273 /**
0274  * Process xid...
0275  * to avoid possible deadlocks while we recover the
0276  * transactions, we need to download full list of xid to the memory
0277  * and only the process xid by xid, as we might request the particular resource
0278  * to abort the transaction.
0279  * 
0280  * @param tmxid raw DB xid to process
0281  * @param recover_svcnm service name which returned a list of transactions
0282  * @return EXSUCCEED/EXFAIL
0283  */
0284 exprivate int process_xid(char *tmxid, char *recover_svcnm)
0285 {
0286     int ret = EXSUCCEED;
0287     XID xid;
0288     size_t sz;
0289 
0290     memset(&xid, 0, sizeof(xid));
0291     
0292     sz = sizeof(xid);
0293     if (NULL==ndrx_xa_base64_decode((unsigned char *)tmxid, strlen(tmxid),
0294             &sz, (char *)&xid))
0295     {
0296         NDRX_LOG(log_warn, "Failed to parse XID -> Corrupted base64?");
0297     }
0298     else
0299     {
0300         /* print general info */
0301         NDRX_LOG(log_debug, "DATA: formatID: 0x%lx (%s) gtrid_length: %ld bqual_length: %ld\n",
0302                 xid.formatID, 
0303                 ((NDRX_XID_FORMAT_ID==xid.formatID ||
0304                     NDRX_XID_FORMAT_ID==(long)ntohll(xid.formatID))?
0305                     "fmt OK":"Not Enduro/X or different arch"),
0306                 xid.gtrid_length, xid.bqual_length);
0307 
0308         /* 
0309          * both formats are fine, as contents are platform agnostic
0310          * Maybe the ID shall be stored in such way... ?
0311          */
0312         if (NDRX_XID_FORMAT_ID==xid.formatID ||
0313                 NDRX_XID_FORMAT_ID==(long)ntohll(xid.formatID))
0314         {
0315             short nodeid;
0316             short srvid;
0317             unsigned char rmid_start;
0318             unsigned char rmid_cur;
0319             long btid;
0320             char xid_str[NDRX_XID_SERIAL_BUFSIZE];
0321 
0322             NDRX_LOG(log_debug, "Format OK");
0323 
0324             /* get base xid... */
0325 
0326             /* Print Enduro/X related xid data */
0327             atmi_xa_xid_get_info(&xid, &nodeid, 
0328                 &srvid, &rmid_start, &rmid_cur, &btid);
0329 
0330             /* the generic xid would be with out branch & current rmid */
0331             /* reset xid trailer to have original xid_str */
0332             memset(xid.data + xid.gtrid_length - 
0333                 sizeof(long) - sizeof(unsigned char), 
0334                     0, sizeof(long)+sizeof(unsigned char));
0335 
0336             /* serialize to base xid, btid=0? */
0337             atmi_xa_serialize_xid(&xid, xid_str);
0338                 
0339             NDRX_LOG(log_debug, "Got base xid: [%s]", xid_str);
0340                 
0341             /* query status? */
0342             if (TPEMATCH==get_xid_status(xid_str, rmid_start,  nodeid, srvid))
0343             {
0344                 NDRX_LOG(log_error, "Aborting transaction: [%s]", xid_str);
0345                 
0346                 /* Load TMXID with tmxid.
0347                  * Load TMCMD with ATMI_XA_ABORTLOCAL
0348                  * If return codes are:
0349                  * - XA_HEURHAZ
0350                  * - XA_HEURRB
0351                  * - XA_HEURCOM
0352                  * - XA_HEURMIX
0353                  * we shall call the ATMI_XA_FORGETLOCAL too.
0354                  */
0355                 
0356                 ret = tran_finalize(ATMI_XA_ABORTLOCAL, recover_svcnm, tmxid);
0357                 
0358                 switch (ret)
0359                 {
0360                     
0361                     case XA_OK:
0362                     case XA_RDONLY:
0363                         M_aborted++;
0364                         NDRX_LOG(log_debug, "Aborted OK");
0365                         break;
0366                     case XA_HEURHAZ:
0367                     case XA_HEURCOM:
0368                     case XA_HEURRB:
0369                     case XA_HEURMIX:
0370                         NDRX_LOG(log_debug, "Heuristic result -> forgetting");
0371                         if (EXSUCCEED!=tran_finalize(ATMI_XA_FORGETLOCAL, recover_svcnm, tmxid))
0372                         {
0373                             NDRX_LOG(log_error, "Failed to forget [%s]: %d - ignore",
0374                                     tmxid, ret);
0375                         }
0376                             
0377                         break;
0378                 }
0379                 ret = EXSUCCEED;
0380             }
0381         }
0382     }
0383         
0384 out:
0385     return ret;
0386 }
0387 
0388 /**
0389  * Fill up the growlist of xids from the server.
0390  * And then process xids one by one, if needed, perform abortlocal
0391  * @return EXSUCCEED/EXFAIL
0392  */
0393 exprivate int call_tm(UBFH *p_ub, char *svcnm, short parse)
0394 {
0395     int ret=EXSUCCEED;
0396     int cd, i;
0397     long revent;
0398     int recv_continue = 1;
0399     int tp_errno;
0400     ndrx_growlist_t list;
0401     char tmp[sizeof(XID)*3];
0402     BFLDLEN len;
0403     
0404     /* what is size of raw xid? */
0405     ndrx_growlist_init(&list, 100, sizeof(XID)*3);
0406             
0407     /* Setup the call buffer... */
0408     if (NULL==p_ub)
0409     {
0410         NDRX_LOG(log_error, "Failed to alloc FB!");
0411         EXFAIL_OUT(ret);
0412     }
0413     
0414     /* reset the call buffer only to request fields... */
0415     if (EXSUCCEED!=atmi_xa_reset_tm_call(p_ub))
0416     {
0417         NDRX_LOG(log_error, "Failed to prepare UBF for TM call!");
0418         EXFAIL_OUT(ret);
0419     }
0420     
0421     if (EXFAIL == (cd = tpconnect(svcnm,
0422                                     (char *)p_ub,
0423                                     0,
0424                                     TPNOTRAN |
0425                                     TPRECVONLY)))
0426     {
0427         NDRX_LOG(log_error, "Connect error [%s]", tpstrerror(tperrno) );
0428         ret = EXFAIL;
0429         goto out;
0430     }
0431     NDRX_LOG(log_debug, "Connected OK, cd = %d", cd );
0432 
0433     while (recv_continue)
0434     {
0435         recv_continue=0;
0436         if (EXFAIL == tprecv(cd,
0437                             (char **)&p_ub,
0438                             0L,
0439                             0L,
0440                             &revent))
0441         {
0442             ret = EXFAIL;
0443             tp_errno = tperrno;
0444             if (TPEEVENT == tp_errno)
0445             {
0446                     if (TPEV_SVCSUCC == revent)
0447                             ret = EXSUCCEED;
0448                     else
0449                     {
0450                         NDRX_LOG(log_error,
0451                                  "Unexpected conv event %lx", revent );
0452 
0453                     }
0454             }
0455             else
0456             {
0457                 NDRX_LOG(log_error, "recv error %d", tp_errno  );
0458                 EXFAIL_OUT(ret);
0459             }
0460         }
0461         else
0462         {
0463             
0464             /* get the xid bytes... */            
0465             /* the xid is binary XID recovered in hex */
0466             len = sizeof(tmp);
0467             if (EXSUCCEED!=Bget(p_ub, TMXID, 0, (char *)tmp, &len))
0468             {
0469                 NDRX_LOG(log_error, "Failed to TMXID fld: [%s]",  Bstrerror(Berror));
0470                 EXFAIL_OUT(ret);
0471             }
0472             
0473             if (EXSUCCEED!=ndrx_growlist_append(&list, tmp))
0474             {
0475                 NDRX_LOG(log_error, "Failed to add data to growlist of xids");
0476                 EXFAIL_OUT(ret);
0477             }
0478             recv_continue=1;
0479         }
0480     }
0481     
0482     /* OK scan the list and process the xids */
0483     for (i=0; i<=list.maxindexused; i++)
0484     {
0485         if (EXSUCCEED!=process_xid(list.mem + (sizeof(XID)*3)*i, svcnm))
0486         {
0487             NDRX_LOG(log_error, "Failed to process xid [%s] at index %d for service [%s]",
0488                     list.mem + (sizeof(XID)*3)*i, i, svcnm);
0489             EXFAIL_OUT(ret);
0490         }
0491     }
0492     
0493 out:
0494 
0495     if (EXSUCCEED!=ret)
0496     {
0497         tpdiscon(cd);
0498     }
0499 
0500     /* delete the growlist */
0501     ndrx_growlist_free(&list);
0502 
0503 
0504     return ret;
0505 }
0506 
0507 /**
0508  * Scan all the TMSRVs for transaction
0509  * Check the status of transaction (base branch tid 0) by the originator
0510  * if TPEMATCH, then abort transaction & cache the result
0511  * @return EXSUCCEED/EXFAIL
0512  */
0513 expublic int ndrx_tmrecover_do(void)
0514 {
0515     int ret = EXSUCCEED;
0516     atmi_svc_list_t *el, *tmp, *list;
0517     UBFH *p_ub = atmi_xa_alloc_tm_call(ATMI_XA_RECOVERLOCAL);
0518     short parse = EXFALSE;
0519     int lev;
0520     
0521     if (NULL==p_ub)
0522     {
0523         NDRX_LOG(log_error, "Failed to alloc UBF!");
0524         EXFAIL_OUT(ret);
0525     }
0526     
0527     M_aborted = 0;
0528 
0529     list = ndrx_get_svc_list(ndrx_tmfilter_common);
0530 
0531     LL_FOREACH_SAFE(list,el,tmp)
0532     {
0533         NDRX_LOG(log_info, "About to call service: [%s]", el->svcnm);
0534         ret = call_tm(p_ub, el->svcnm, parse);
0535         LL_DELETE(list,el);
0536         NDRX_FREE(el);
0537     }
0538     
0539     lev = log_warn;
0540 
0541     
0542     if (M_aborted>0 || EXSUCCEED!=ret)
0543     {
0544         lev = log_error;
0545     }
0546     
0547     NDRX_LOG(lev, "Rolled back %d orphan transactions branches(ret=%d)", M_aborted, ret);
0548     
0549 out:
0550     
0551     if (NULL!=p_ub)
0552     {
0553         tpfree((char *)p_ub);
0554     }
0555 
0556     if (NULL!=M_trans)
0557     {
0558         free_trans();
0559     }
0560 
0561     if (EXSUCCEED==ret)
0562     {
0563         return M_aborted;
0564     }
0565     else
0566     {
0567         return ret;
0568     }
0569 }
0570 
0571 
0572 
0573 /* vim: set ts=4 sw=4 et smartindent: */