Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief tmrecover logic driver
0003  *
0004  * @file tmrecover.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <memory.h>
0038 #include <sys/param.h>
0039 
0040 #include <ndrstandard.h>
0041 #include <ndebug.h>
0042 #include <nstdutil.h>
0043 
0044 #include <ndrxdcmn.h>
0045 #include <atmi_int.h>
0046 #include <gencall.h>
0047 #include <utlist.h>
0048 #include <Exfields.h>
0049 #include <xa.h>
0050 #include <ubfutil.h>
0051 
0052 #include "xa_cmn.h"
0053 #include "exbase64.h"
0054 #include <nclopt.h>
0055 #include <exhash.h>
0056 /*---------------------------Externs------------------------------------*/
0057 /*---------------------------Macros-------------------------------------*/
0058 /*---------------------------Enums--------------------------------------*/
0059 /*---------------------------Typedefs-----------------------------------*/
0060 
0061 /**
0062  * transaction status hashmap
0063  */
0064 typedef struct {
0065     
0066     /** transaction base id */
0067     char xid_str[NDRX_XID_SERIAL_BUFSIZE];
0068     
0069     /** atmi status of the transaction */
0070     int atmi_err;
0071     
0072     EX_hash_handle hh; /**< makes this structure hashable               */
0073 } xid_status_t;
0074 
0075 /*---------------------------Globals------------------------------------*/
0076 /*---------------------------Statics------------------------------------*/
0077 exprivate long M_aborted;   /**< Number of aborted global transactions  */
0078 exprivate xid_status_t *M_trans=NULL; /** list of transactions          */
0079 
0080 /*---------------------------Prototypes---------------------------------*/
0081 
0082 /**
0083  * Free up the list of cached transaction results
0084  */
0085 exprivate void free_trans(void)
0086 {
0087     xid_status_t * el, *elt;
0088     
0089     EXHASH_ITER(hh, M_trans, el, elt)
0090     {
0091         EXHASH_DEL(M_trans, el);
0092         NDRX_FPFREE(el);
0093     }
0094 }
0095 
0096 /**
0097  * This returns cached ATMI result
0098  * Cache is needed due to fact that we might have several branches for the
0099  * transaction, and if transaction is alive, then no need to query TM again.
0100  * @param xid_str base xid (btid/rmid is 0)
0101  * @param rmid_start RMID which started the transactions
0102  * @param nodeid cluster node id which started the transaction
0103  * @param srvid server id which started the transaction
0104  * @return 0 - transaction is alive, atmi error code in case if not found  (TPEMATCH) / error
0105  */
0106 exprivate int get_xid_status(char *xid_str, unsigned char rmid_start, short nodeid, short srvid)
0107 {
0108     int ret = EXSUCCEED;
0109     xid_status_t *stat;
0110     char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0111     UBFH *p_ub = NULL;
0112     char cmd = ATMI_XA_STATUS;
0113     long rsplen;
0114     
0115     EXHASH_FIND_STR( M_trans, xid_str, stat);
0116     
0117     if (NULL!=stat)
0118     {
0119         NDRX_LOG(log_info, "transaction [%s] result cached: %d", xid_str, stat->atmi_err);
0120         ret = stat->atmi_err;
0121     }
0122     
0123     /* if not found, query the service */
0124     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TM_I, (int)nodeid,  (int)rmid_start, (int)srvid);
0125     
0126     p_ub = (UBFH *)tpalloc("UBF", NULL, 1024);
0127     
0128     if (NULL==p_ub)
0129     {
0130         NDRX_LOG(log_error, "Failed to malloc UBF: %s", tpstrerror(tperrno));
0131         ret = tperrno;
0132         goto out;
0133         
0134     }
0135     
0136     /* set the xid field
0137      * set: TMCMD = ATMI_XA_STATUS:
0138      * set: TMXID = xid_str
0139      * out:
0140      * where is the error loaded?
0141      */
0142     if (EXSUCCEED!=Bchg(p_ub, TMCMD, 0, &cmd, 0L) 
0143             || EXSUCCEED!=Bchg(p_ub, TMXID, 0, xid_str, 0L))
0144     {
0145         NDRX_LOG(log_error, "Failed to set TMCMD or TMXID: %s", Bstrerror(Berror));
0146         ret = TPESYSTEM;
0147         goto out;
0148     }
0149     
0150     NDRX_LOG(log_debug, "Calling [%s] for transaction status", svcnm);
0151     
0152     if (EXSUCCEED!=tpcall(svcnm, (char *)p_ub, 0, (char **)&p_ub, &rsplen, 0))
0153     {
0154         NDRX_LOG(log_error, "Failed to call [%s]: %s", svcnm, tpstrerror(tperrno));
0155         
0156         /* save the current results.. */
0157         ret = tperrno;
0158         
0159         if (tperrno!=TPESVCFAIL)
0160         {
0161             /* nothing more to do here... */
0162             goto out;
0163         }
0164     }
0165 
0166     /* dump the response buffer... */
0167     ndrx_debug_dump_UBF(log_debug, "Response buffer:", p_ub);
0168     
0169     /* if got the error code back, read it... */
0170     if (Bpres(p_ub, TMERR_CODE, 0))
0171     {
0172         short sret;
0173         if (EXSUCCEED==Bget(p_ub, TMERR_CODE, 0, (char *)&sret, 0L))
0174         {
0175             ret=sret;
0176         }
0177     }
0178     
0179     /* OK got the final result: */
0180     NDRX_LOG(log_debug, "Transaction [%s] service [%s] reported status: %d - %s",
0181             xid_str, svcnm, ret, (TPEMATCH==ret?"tx not found":"tx found or error"));
0182     
0183     /* cache the result */
0184     stat = NDRX_FPMALLOC(sizeof(*stat), 0);
0185     
0186     if (NULL!=stat)
0187     {
0188         stat->atmi_err = ret;
0189         NDRX_STRCPY_SAFE(stat->xid_str, xid_str);
0190     }
0191     
0192     EXHASH_ADD_STR( M_trans, xid_str, stat);
0193     
0194 out:
0195     
0196     if (NULL!=p_ub)
0197     {
0198         tpfree((char *)p_ub);
0199     }
0200 
0201     return ret;
0202 }
0203 
0204 /**
0205  * Perform xid command on service
0206  * @param cmd ATMI_XA_ABORTLOCAL or ATMI_XA_FORGETLOCAL
0207  * @param recover_svcnm
0208  * @param tmxid
0209  * @return XA error code, so that we can decide is forget needed.
0210  */
0211 exprivate int tran_finalize(char cmd, char *recover_svcnm, char *tmxid)
0212 {
0213     short ret = EXSUCCEED;
0214     UBFH *p_ub = NULL;
0215     long rsplen, flags;
0216 
0217     p_ub = (UBFH *)tpalloc("UBF", NULL, 1024);
0218     if (NULL==p_ub)
0219     {
0220         NDRX_LOG(log_error, "Failed to malloc UBF: %s", tpstrerror(tperrno));
0221         ret = tperrno;
0222         goto out;
0223     }
0224     
0225     flags = TMFLAGS_NOCON;
0226     
0227     if (EXSUCCEED!=Bchg(p_ub, TMCMD, 0, &cmd, 0L) 
0228             || EXSUCCEED!=Bchg(p_ub, TMXID, 0, tmxid, 0L)
0229             || EXSUCCEED!=Bchg(p_ub, TMTXFLAGS, 0, (char *)&flags, 0L))
0230     {
0231         NDRX_LOG(log_error, "Failed to set TMCMD/TMXID/TMTXFLAGS: %s", Bstrerror(Berror));
0232         ret = XAER_RMERR;
0233         goto out;
0234     }
0235     
0236     NDRX_LOG(log_debug, "Calling command [%c] on [%s] for xid [%s]", cmd, recover_svcnm, tmxid);
0237     
0238     /* thought these only work on conv mode
0239      * here no conversion shall be established, just tpreturn
0240      * thus needs some kind of flag.
0241      */
0242     if (EXSUCCEED!=tpcall(recover_svcnm, (char *)p_ub, 0, (char **)&p_ub, &rsplen, 0))
0243     {
0244         NDRX_LOG(log_error, "Failed to call [%s]: %s", recover_svcnm, tpstrerror(tperrno));
0245         
0246         ret = XAER_RMERR;
0247         
0248         if (tperrno!=TPESVCFAIL)
0249         {
0250             /* nothing more to do here... */
0251             goto out;
0252         }
0253     }
0254     
0255     /* try to get xa reason
0256      * and override current status with it
0257      */
0258     if (Bpres(p_ub, TMERR_REASON, 0) && 
0259             EXSUCCEED!=Bget(p_ub, TMERR_REASON, 0, (char *)&ret, 0L))
0260     {
0261         NDRX_LOG(log_error, "Failed to get reason field: %s", Bstrerror(Berror));
0262         ret=XAER_RMERR;
0263     }
0264     
0265 out:
0266                 
0267     NDRX_LOG(log_debug, "Returning %d", ret);
0268     return ret;
0269 }
0270 
0271 /**
0272  * Process xid...
0273  * to avoid possible deadlocks while we recover the
0274  * transactions, we need to download full list of xid to the memory
0275  * and only the process xid by xid, as we might request the particular resource
0276  * to abort the transaction.
0277  * 
0278  * @param tmxid raw DB xid to process
0279  * @param recover_svcnm service name which returned a list of transactions
0280  * @return EXSUCCEED/EXFAIL
0281  */
0282 exprivate int process_xid(char *tmxid, char *recover_svcnm)
0283 {
0284     int ret = EXSUCCEED;
0285     XID xid;
0286     size_t sz;
0287 
0288     memset(&xid, 0, sizeof(xid));
0289     
0290     sz = sizeof(xid);
0291     if (NULL==ndrx_xa_base64_decode((unsigned char *)tmxid, strlen(tmxid),
0292             &sz, (char *)&xid))
0293     {
0294         NDRX_LOG(log_warn, "Failed to parse XID -> Corrupted base64?");
0295     }
0296     else
0297     {
0298         /* print general info */
0299         NDRX_LOG(log_debug, "DATA: formatID: 0x%lx (%s) gtrid_length: %ld bqual_length: %ld\n",
0300                 xid.formatID, 
0301                 ((NDRX_XID_FORMAT_ID==xid.formatID ||
0302                     NDRX_XID_FORMAT_ID==(long)ntohll(xid.formatID))?
0303                     "fmt OK":"Not Enduro/X or different arch"),
0304                 xid.gtrid_length, xid.bqual_length);
0305 
0306         /* 
0307          * both formats are fine, as contents are platform agnostic
0308          * Maybe the ID shall be stored in such way... ?
0309          */
0310         if (NDRX_XID_FORMAT_ID==xid.formatID ||
0311                 NDRX_XID_FORMAT_ID==(long)ntohll(xid.formatID))
0312         {
0313             short nodeid;
0314             short srvid;
0315             unsigned char rmid_start;
0316             unsigned char rmid_cur;
0317             long btid;
0318             char xid_str[NDRX_XID_SERIAL_BUFSIZE];
0319 
0320             NDRX_LOG(log_debug, "Format OK");
0321 
0322             /* get base xid... */
0323 
0324             /* Print Enduro/X related xid data */
0325             atmi_xa_xid_get_info(&xid, &nodeid, 
0326                 &srvid, &rmid_start, &rmid_cur, &btid);
0327 
0328             /* the generic xid would be with out branch & current rmid */
0329             /* reset xid trailer to have original xid_str */
0330             memset(xid.data + xid.gtrid_length - 
0331                 sizeof(long) - sizeof(unsigned char), 
0332                     0, sizeof(long)+sizeof(unsigned char));
0333 
0334             /* serialize to base xid, btid=0? */
0335             atmi_xa_serialize_xid(&xid, xid_str);
0336                 
0337             NDRX_LOG(log_debug, "Got base xid: [%s]", xid_str);
0338                 
0339             /* query status? */
0340             if (TPEMATCH==get_xid_status(xid_str, rmid_start,  nodeid, srvid))
0341             {
0342                 NDRX_LOG(log_error, "Aborting transaction: [%s]", xid_str);
0343                 
0344                 /* Load TMXID with tmxid.
0345                  * Load TMCMD with ATMI_XA_ABORTLOCAL
0346                  * If return codes are:
0347                  * - XA_HEURHAZ
0348                  * - XA_HEURRB
0349                  * - XA_HEURCOM
0350                  * - XA_HEURMIX
0351                  * we shall call the ATMI_XA_FORGETLOCAL too.
0352                  */
0353                 
0354                 ret = tran_finalize(ATMI_XA_ABORTLOCAL, recover_svcnm, tmxid);
0355                 
0356                 switch (ret)
0357                 {
0358                     
0359                     case XA_OK:
0360                     case XA_RDONLY:
0361                         M_aborted++;
0362                         NDRX_LOG(log_debug, "Aborted OK");
0363                         break;
0364                     case XA_HEURHAZ:
0365                     case XA_HEURCOM:
0366                     case XA_HEURRB:
0367                     case XA_HEURMIX:
0368                         NDRX_LOG(log_debug, "Heuristic result -> forgetting");
0369                         if (EXSUCCEED!=tran_finalize(ATMI_XA_FORGETLOCAL, recover_svcnm, tmxid))
0370                         {
0371                             NDRX_LOG(log_error, "Failed to forget [%s]: %d - ignore",
0372                                     tmxid, ret);
0373                         }
0374                             
0375                         break;
0376                 }
0377                 ret = EXSUCCEED;
0378             }
0379         }
0380     }
0381         
0382 out:
0383     return ret;
0384 }
0385 
0386 /**
0387  * Check can we bypass tmsrv call error
0388  * @param active call descriptor
0389  */
0390 exprivate int can_bypass_tm_err(int cd)
0391 {
0392     int ret = EXFALSE;
0393 
0394     switch (tperrno)
0395     {
0396         case TPETIME:
0397             tpdiscon(cd);
0398         case TPENOENT:
0399             ret = EXTRUE;
0400             break;
0401     }
0402 
0403 out:
0404     return ret;
0405 }
0406 
0407 /**
0408  * Fill up the growlist of xids from the server.
0409  * And then process xids one by one, if needed, perform abortlocal
0410  * @return EXSUCCEED/EXFAIL
0411  */
0412 exprivate int call_tm(UBFH *p_ub, char *svcnm, short parse)
0413 {
0414     int ret=EXSUCCEED;
0415     int cd, i;
0416     long revent;
0417     int recv_continue = 1;
0418     int tp_errno;
0419     ndrx_growlist_t list;
0420     char tmp[sizeof(XID)*3];
0421     BFLDLEN len;
0422     
0423     /* what is size of raw xid? */
0424     ndrx_growlist_init(&list, 100, sizeof(XID)*3);
0425             
0426     /* Setup the call buffer... */
0427     if (NULL==p_ub)
0428     {
0429         NDRX_LOG(log_error, "Failed to alloc FB!");
0430         EXFAIL_OUT(ret);
0431     }
0432     
0433     /* reset the call buffer only to request fields... */
0434     if (EXSUCCEED!=atmi_xa_reset_tm_call(p_ub))
0435     {
0436         NDRX_LOG(log_error, "Failed to prepare UBF for TM call!");
0437         EXFAIL_OUT(ret);
0438     }
0439     
0440     if (EXFAIL == (cd = tpconnect(svcnm,
0441                                     (char *)p_ub,
0442                                     0,
0443                                     TPNOTRAN |
0444                                     TPRECVONLY)))
0445     {
0446         NDRX_LOG(log_error, "Connect error [%s]", tpstrerror(tperrno) );
0447 
0448         /* generate no error if possible */
0449         if (can_bypass_tm_err(cd))
0450         {
0451             ret = EXSUCCEED;
0452         }
0453         else
0454         {
0455             ret = EXFAIL;
0456         }
0457         goto out;
0458     }
0459     NDRX_LOG(log_debug, "Connected OK, cd = %d", cd );
0460 
0461     while (recv_continue)
0462     {
0463         recv_continue=0;
0464         if (EXFAIL == tprecv(cd,
0465                             (char **)&p_ub,
0466                             0L,
0467                             0L,
0468                             &revent))
0469         {
0470             ret = EXFAIL;
0471             tp_errno = tperrno;
0472             if (TPEEVENT == tp_errno)
0473             {
0474                     if (TPEV_SVCSUCC == revent)
0475                             ret = EXSUCCEED;
0476                     else
0477                     {
0478                         NDRX_LOG(log_error,
0479                                  "Unexpected conv event %lx", revent );
0480 
0481                     }
0482             }
0483             else
0484             {
0485                 NDRX_LOG(log_error, "recv error %d", tp_errno  );
0486                 /* generate no error if possible */
0487                 if (can_bypass_tm_err(cd))
0488                 {
0489                     ret = EXSUCCEED;
0490                 }
0491                 else
0492                 {
0493                     ret = EXFAIL;
0494                 }
0495                 goto out;
0496             }
0497         }
0498         else
0499         {
0500             
0501             /* get the xid bytes... */            
0502             /* the xid is binary XID recovered in hex */
0503             len = sizeof(tmp);
0504             if (EXSUCCEED!=Bget(p_ub, TMXID, 0, (char *)tmp, &len))
0505             {
0506                 NDRX_LOG(log_error, "Failed to TMXID fld: [%s]",  Bstrerror(Berror));
0507                 EXFAIL_OUT(ret);
0508             }
0509             
0510             if (EXSUCCEED!=ndrx_growlist_append(&list, tmp))
0511             {
0512                 NDRX_LOG(log_error, "Failed to add data to growlist of xids");
0513                 EXFAIL_OUT(ret);
0514             }
0515             recv_continue=1;
0516         }
0517     }
0518     
0519     /* OK scan the list and process the xids */
0520     for (i=0; i<=list.maxindexused; i++)
0521     {
0522         if (EXSUCCEED!=process_xid(list.mem + (sizeof(XID)*3)*i, svcnm))
0523         {
0524             NDRX_LOG(log_error, "Failed to process xid [%s] at index %d for service [%s]",
0525                     list.mem + (sizeof(XID)*3)*i, i, svcnm);
0526             EXFAIL_OUT(ret);
0527         }
0528     }
0529     
0530 out:
0531 
0532     if (EXSUCCEED!=ret)
0533     {
0534         tpdiscon(cd);
0535     }
0536 
0537     /* delete the growlist */
0538     ndrx_growlist_free(&list);
0539 
0540 
0541     return ret;
0542 }
0543 
0544 /**
0545  * Scan all the TMSRVs for transaction
0546  * Check the status of transaction (base branch tid 0) by the originator
0547  * if TPEMATCH, then abort transaction & cache the result
0548  * @return >=0 count of aborted (succeed)/EXFAIL
0549  */
0550 expublic int ndrx_tmrecover_do(void)
0551 {
0552     int ret = EXSUCCEED;
0553     atmi_svc_list_t *el, *tmp, *list;
0554     UBFH *p_ub = atmi_xa_alloc_tm_call(ATMI_XA_RECOVERLOCAL);
0555     short parse = EXFALSE;
0556     int lev;
0557     
0558     if (NULL==p_ub)
0559     {
0560         NDRX_LOG(log_error, "Failed to alloc UBF!");
0561         EXFAIL_OUT(ret);
0562     }
0563     
0564     M_aborted = 0;
0565 
0566     list = ndrx_get_svc_list(ndrx_tmfilter_common);
0567 
0568     LL_FOREACH_SAFE(list,el,tmp)
0569     {
0570         NDRX_LOG(log_info, "About to call service: [%s]", el->svcnm);
0571         ret = call_tm(p_ub, el->svcnm, parse);
0572         LL_DELETE(list,el);
0573         NDRX_FREE(el);
0574     }
0575     
0576     lev = log_warn;
0577 
0578     
0579     if (M_aborted>0 || EXSUCCEED!=ret)
0580     {
0581         lev = log_error;
0582     }
0583     
0584     NDRX_LOG(lev, "Rolled back %d orphan transactions branches(ret=%d)", M_aborted, ret);
0585     
0586 out:
0587     
0588     if (NULL!=p_ub)
0589     {
0590         tpfree((char *)p_ub);
0591     }
0592 
0593     if (NULL!=M_trans)
0594     {
0595         free_trans();
0596     }
0597 
0598     if (EXSUCCEED==ret)
0599     {
0600         return M_aborted;
0601     }
0602     else
0603     {
0604         return ret;
0605     }
0606 }
0607 
0608 
0609 
0610 /* vim: set ts=4 sw=4 et smartindent: */