Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Transaction state driver (uses libatmi/xastates.c for driving)
0003  *
0004  * @file statedrv.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <atmi_int.h>
0044 #include <typed_buf.h>
0045 #include <ndrstandard.h>
0046 #include <ubf.h>
0047 #include <Exfields.h>
0048 
0049 #include <exnet.h>
0050 #include <ndrxdcmn.h>
0051 
0052 #include "tmsrv.h"
0053 #include "../libatmisrv/srv_int.h"
0054 #include "tperror.h"
0055 #include <xa_cmn.h>
0056 #include <sys_test.h>
0057 /*---------------------------Externs------------------------------------*/
0058 /*---------------------------Macros-------------------------------------*/
0059 /*---------------------------Enums--------------------------------------*/
0060 /*---------------------------Typedefs-----------------------------------*/
0061 
0062 /**
0063  * branch tid vote entry
0064  */
0065 typedef struct 
0066 {
0067     short rmid;  /**< RM ID is voting    */
0068     short stage; /**< to stage           */
0069     long btid;   /**< with branch id     */
0070 } btid_vote_t;
0071 
0072 /*---------------------------Globals------------------------------------*/
0073 /*---------------------------Statics------------------------------------*/
0074 /*---------------------------Prototypes---------------------------------*/
0075 
0076 /**
0077  * Do one try for transaciton processing using state machine defined in atmilib
0078  * @param p_xai - xa info structure
0079  * @param p_tl - transaction log
0080  * @param [in] flags shared tran system and tp flags
0081  * @return TPreturn code.
0082  */
0083 expublic int tm_drive(atmi_xa_tx_info_t *p_xai, atmi_xa_log_t *p_tl, int master_op,
0084                         short rmid, long flags)
0085 {
0086     int ret = EXSUCCEED;
0087     int i;
0088     int again;
0089     rmstatus_driver_t* vote_txstage;
0090     txstage_descriptor_t* descr;
0091     /* char stagearr[NDRX_MAX_RMS];*/
0092     ndrx_growlist_t stagearr; /**< grow list of voted stages */
0093     btid_vote_t vote;
0094     
0095     short min_in_group;
0096     short min_in_overall;
0097     short max_in_overall;
0098     short rm_vote_next_txstage;
0099     int try=0;
0100     int was_retry;
0101     int is_tx_finished = EXFALSE;
0102     
0103     NDRX_LOG(log_info, "tm_drive() enter from xid=[%s] flags=%ld", 
0104             p_xai->tmxid, flags);
0105     
0106     memset(&stagearr, 0, sizeof(stagearr));
0107     
0108     do
0109     {
0110         short new_txstage = XA_TX_STAGE_MAX_NEVER;
0111         int op_code = 0;
0112         int op_ret = 0;
0113         int op_reason = 0;
0114         int op_tperrno = 0;
0115         atmi_xa_rm_status_btid_t *el, *elt;
0116         was_retry = EXFALSE;
0117         
0118         again = EXFALSE;
0119         
0120         if (NULL==(descr = xa_stage_get_descr(p_tl->txstage)))
0121         {
0122             NDRX_LOG(log_error, "Invalid stage %hd", p_tl->txstage);
0123             ret=TPESYSTEM;
0124             goto out;
0125         }
0126         
0127         NDRX_LOG(log_info, "Entered in stage: %s", descr->descr);
0128         
0129         if (NULL!=stagearr.mem)
0130         {
0131             /* reset the momory, no reset needed. */
0132             stagearr.maxindexused = -1;
0133         }
0134         else if (0==stagearr.size) /* not initialized */
0135         {
0136             /* this does not allocate memory */
0137             ndrx_growlist_init(&stagearr, 100, sizeof(btid_vote_t));
0138         }
0139          
0140         for (i=0; i<NDRX_MAX_RMS; i++)
0141         {
0142             EXHASH_ITER(hh, p_tl->rmstatus[i].btid_hash, el, elt)
0143             {
0144 
0145                 NDRX_LOG(log_info, "RMID: %hd status %c", 
0146                                             i+1, el->rmstatus);
0147 
0148                 op_reason = XA_OK;
0149                 op_tperrno = 0;
0150                 op_code = xa_status_get_op(p_tl->txstage, el->rmstatus);
0151                 switch (op_code)
0152                 {
0153                     case XA_OP_NOP:
0154                         NDRX_LOG(log_info, "OP_NOP");
0155                         break;
0156                     case XA_OP_PREPARE:
0157                         NDRX_LOG(log_info, "Prepare RMID %d", i+1);
0158                         if (EXSUCCEED!=(op_ret = tm_prepare_combined(p_xai, i+1, el->btid)))
0159                         {
0160                             op_reason = atmi_xa_get_reason();
0161                             op_tperrno = tperrno;
0162                         }
0163                         break;
0164                     case XA_OP_COMMIT:
0165                         NDRX_LOG(log_info, "Commit RMID %d", i+1);
0166                         
0167                         /* system test entry point
0168                          * for case when tmsrv is unable to complete...
0169                          */
0170                         if (NDRX_SYSTEST_ENBLD && ndrx_systest_case(NDRX_SYSTEST_TMSCOMMIT))
0171                         {
0172                             op_reason = XAER_RMERR;
0173                             op_tperrno = TPESVCERR;
0174                         }
0175                         else if (EXSUCCEED!=(op_ret = tm_commit_combined(p_xai, i+1, el->btid)))
0176                         {
0177                             op_reason = atmi_xa_get_reason();
0178                             op_tperrno = tperrno;
0179                         }
0180                         break;
0181                     case XA_OP_ROLLBACK:
0182                         NDRX_LOG(log_info, "Rollback RMID %d", i+1);
0183                         if (EXSUCCEED!=(op_ret = tm_rollback_combined(p_xai, i+1, el->btid)))
0184                         {
0185                             op_reason = atmi_xa_get_reason();
0186                             op_tperrno = tperrno;
0187                         }
0188                         break;
0189                     case XA_OP_FORGET:
0190                         NDRX_LOG(log_info, "Forget RMID %d", i+1);
0191                         if (EXSUCCEED!=(op_ret = tm_forget_combined(p_xai, i+1, el->btid)))
0192                         {
0193                             op_reason = atmi_xa_get_reason();
0194                             op_tperrno = tperrno;
0195                         }
0196                         break;
0197                     default:
0198                         NDRX_LOG(log_error, "Invalid opcode %d", op_code);
0199                         ret=TPESYSTEM;
0200                         goto out;
0201                         break;
0202                 }
0203                 NDRX_LOG(log_info, "Operation tperrno: %d, xa return code: %d",
0204                                          op_tperrno, op_reason);
0205 
0206                 /* In case if not preparing
0207                  * allow some retries. 
0208                  * TODO: Probably would want to add some sleep to wait
0209                  * retry (for background ops, probably no retry processing
0210                  * required at all).
0211                  */
0212                 if (XA_TX_STAGE_PREPARING!=p_tl->txstage
0213                         && (op_reason==XA_RETRY || op_reason==XAER_RMFAIL))
0214                 {
0215                     was_retry = EXTRUE;   
0216                 }
0217 
0218                 /* Now get the transition of the state/vote */
0219                 if (XA_OP_NOP == op_code)
0220                 {
0221                     /* So this does not vote?
0222                      * But if it was recovered files? And it previously voted,
0223                      * the transaction should be aborted? Seems like this is
0224                      * not very correct.
0225                      * However this happens only if the row is defined. Thus
0226                      * all decision status mappings for stage must be defined.
0227                      */
0228                     if (NULL==(vote_txstage = xa_status_get_next_by_new_status(p_tl->txstage, 
0229                             el->rmstatus)))
0230                     {
0231                         NDRX_LOG(log_info, "No stage info for %hd/%c - ignore", p_tl->txstage, 
0232                             el->rmstatus);
0233                         /*
0234                         ret=TPESYSTEM;
0235                         goto out;
0236                         */
0237                         continue;
0238                     }
0239                     
0240                     rm_vote_next_txstage = vote_txstage->next_txstage;
0241                     
0242                 }
0243                 else
0244                 {
0245                     /* this will ULOG unexpected return codes: */
0246                     if (NULL==(vote_txstage = xa_status_get_next_by_op(p_tl->txstage, 
0247                             el->rmstatus, op_code, op_reason,
0248                             p_xai, i+1, el->btid)))
0249                     {
0250                         NDRX_LOG(log_error, "Invalid stage for %hd/%c/%d/%d", 
0251                                 p_tl->txstage, el->rmstatus, op_code, op_reason);
0252                         ret=TPESYSTEM;
0253                         goto out;
0254                     }
0255                     
0256                     /* Log RM status change... 
0257                      * not very critical, as we will retry with last op.
0258                      * if not logged.
0259                      * BUt if prepare logging fails, vote for aborting.
0260                      */
0261                     if (EXSUCCEED!=tms_log_rmstatus(p_tl, el, vote_txstage->next_rmstatus, 
0262                             tperrno, op_reason) && XA_TX_STAGE_PREPARING==p_tl->txstage)
0263                     {
0264                         /* vote for abort */
0265                         NDRX_LOG(log_error, "Failed to log RMID %d status during "
0266                                 "preparing of [%s]- disk error, "
0267                                 "aborting...", i+1, p_xai->tmxid);
0268                         userlog("Failed to log RMID %d status during "
0269                                 "preparing of [%s]- disk error, "
0270                                 "aborting...", i+1, p_xai->tmxid);
0271                         rm_vote_next_txstage = XA_TX_STAGE_ABORTING;
0272                     }
0273                     else
0274                     {
0275                         rm_vote_next_txstage = vote_txstage->next_txstage;
0276                     }
0277                     
0278                 }
0279                 /* Stage switching... */
0280                 vote.btid = el->btid;
0281                 vote.rmid = el->rmid;
0282                 vote.stage = rm_vote_next_txstage;
0283                 
0284                 if (EXSUCCEED!=ndrx_growlist_append(&stagearr, &vote))
0285                 {
0286                     NDRX_LOG(log_error, "Failed to add rmid=%hd, btid=%hd to "
0287                             "stagearr with stage=%hd",
0288                             vote.rmid, vote.btid, vote.stage);
0289                     ret=TPESYSTEM;
0290                     goto out;
0291                 }
0292                 
0293                 /* so if it is outside of our range and jump is permitted, then
0294                  * jump to lowest level we got.
0295                  */
0296                 if ((descr->txs_stage_min > rm_vote_next_txstage ||
0297                         descr->txs_max_complete < rm_vote_next_txstage) 
0298                         && descr->allow_jump 
0299                         /* allow to downgrade */
0300                         && rm_vote_next_txstage < XA_TX_STAGE_PREPARING)
0301                 {
0302                     /* 
0303                      * jump to lowest level we got.
0304                      */
0305                     new_txstage = rm_vote_next_txstage;
0306                     NDRX_LOG(log_info, "Voting to leave group for %hd!", new_txstage);
0307                     /* switch the stage */
0308                     again = EXTRUE;
0309                     goto break_all_groups;
0310                 }
0311 
0312                 /* Maybe we need some kind of arrays to put return stages in? 
0313                  We need to put all states in one array.
0314                  1. If there is any stage in the min & max ranges => Stick with the lowest from range
0315                  2. If there is nothing in range, but have stuff outside, then take lowest from outside
0316                  */
0317             }
0318         }
0319         
0320 break_all_groups:
0321         
0322         if (XA_TX_STAGE_MAX_NEVER==new_txstage)
0323         {
0324             min_in_group = XA_TX_STAGE_MAX_NEVER;
0325             min_in_overall = XA_TX_STAGE_MAX_NEVER;
0326             max_in_overall = XA_TX_STAGE_MIN_NEVER;
0327             /* Calculate from array */
0328             for (i=0; i<=stagearr.maxindexused; i++)
0329             {
0330                 btid_vote_t *ve = stagearr.mem+sizeof(btid_vote_t)*i;
0331                 
0332                 NDRX_LOG(log_info, "RM %hd btid=%ld votes for stage: %hd", 
0333                         ve->rmid, ve->btid, ve->stage);
0334 
0335                 /* Bug #150 */
0336                 if (ve->stage < min_in_overall)
0337                 {
0338                     min_in_overall = ve->stage;
0339                     NDRX_LOG(log_debug, "min_in_overall=>%hd", min_in_overall);
0340                 }
0341                 
0342                 if (ve->stage > max_in_overall)
0343                 {
0344                     max_in_overall = ve->stage;
0345                     NDRX_LOG(log_debug, "max_in_overall=>%hd", max_in_overall);
0346                 }
0347 
0348                 /* what is this? Descr and vote_txstage will be last
0349                  * from the loop - wrong!
0350                  * We play with next stages from arr: stagearr[i]
0351                  * What is group? Seems like same type of staging, i.e.
0352                  * still committing
0353                  */
0354                 if (descr->txs_stage_min<=ve->stage && 
0355                         descr->txs_max_complete>=ve->stage &&
0356                         min_in_group < ve->stage)
0357                 {
0358                     min_in_group = ve->stage;
0359                     NDRX_LOG(log_debug, "min_in_group=>%hd", min_in_group);
0360                 }
0361             }/* for */
0362             
0363             /* if min_in_overall is in completed range
0364              * and max_in_overall is higher than completed
0365              * then allow to switch to max_state
0366              */
0367             if (descr->txs_min_complete <= min_in_overall 
0368                     && min_in_overall <= descr->txs_max_complete
0369                     && max_in_overall> descr->txs_max_complete)
0370             {
0371                 new_txstage=max_in_overall;
0372                 NDRX_LOG(log_debug, "New tx stage set by max_in_overall=>%hd", new_txstage);
0373             }
0374             else if (min_in_group!=XA_TX_STAGE_MAX_NEVER)
0375             {
0376                 new_txstage=min_in_group;
0377                 NDRX_LOG(log_debug, "New tx stage set by min_in_group=>%hd", new_txstage);
0378             }
0379             else
0380             {
0381                 new_txstage=min_in_overall;
0382                 NDRX_LOG(log_debug, "New tx stage set by min_in_overall=>%hd", new_txstage);
0383             }
0384             
0385             if (XA_TX_STAGE_MAX_NEVER==new_txstage)
0386             {
0387                 NDRX_LOG(log_info, "Stage not switched - assume MAX COMPLETED!");
0388                 new_txstage=descr->txs_max_complete;
0389                 /*
0390                 ret=TPESYSTEM;
0391                 goto out;
0392                 */
0393             }
0394             
0395         } /* calc stage */
0396         
0397         /* Finally switch the stage & run again! */
0398         if (new_txstage!=descr->txstage && new_txstage!=XA_TX_STAGE_MAX_NEVER)
0399         {
0400             int is_forced = EXTRUE;
0401             
0402             if (XA_TX_STAGE_COMMITTING==new_txstage)
0403             {
0404                 is_forced = EXFALSE;
0405             }
0406             
0407             /* this will return FAIL only if we are switching to committing: */
0408             if (EXSUCCEED!=tms_log_stage(p_tl, new_txstage, is_forced))
0409             {
0410                 /* critical point here is if we decided to go for commit
0411                  * and we was not able to log that, then we must
0412                  * flip to abort.
0413                  * - If there will be no log after the restart, it shall pass
0414                  * under the timeout condition.
0415                  * - If we were in "preparing" stage, then it would be switched
0416                  * to aborting automatically.
0417                  */
0418                 NDRX_LOG(log_error, "Failed to log committing decision [%s] - disk error, "
0419                     "aborting...", p_xai->tmxid);
0420                 userlog("Failed to log committing decision [%s] - disk error, "
0421                     "aborting...", p_xai->tmxid);
0422 
0423                 new_txstage = XA_TX_STAGE_ABORTING;
0424                 /* it is super critial to mark that we are going for abort
0425                  * otherwise if above logs OK, network disk is removed
0426                  * we start to abort, program is restarted,
0427                  * disk restored, then we start to commit
0428                  * this we might get partial abort / partial commit
0429                  * so after the restart, we might re-process logs correclty
0430                  * if available.
0431                  */
0432                 if (EXSUCCEED!=tms_log_stage(p_tl, new_txstage, EXFALSE))
0433                 {
0434                     userlog("tmsrv logging device does not work - terminating");
0435                     exit(EXFAIL);
0436                 }
0437             }
0438             
0439             again = EXTRUE;
0440         }
0441         
0442         /* if switched to committing & requested decision logged, then return */
0443         if ( (flags & TP_CMT_LOGGED) && XA_TX_STAGE_COMMITTING == descr->txstage)
0444         {
0445             NDRX_LOG(log_info, "Decision logged for commit return");
0446             goto out;
0447         }
0448         
0449         if (was_retry)
0450         {
0451             try++;
0452             
0453             NDRX_LOG(log_warn, "XA_RETRY: current try: %d, max (-r): %d", 
0454                         try, G_tmsrv_cfg.xa_retries);
0455             
0456             if (try<G_tmsrv_cfg.xa_retries)
0457             {
0458                 again = EXTRUE;
0459                 NDRX_LOG(log_warn, "Retry on XA_RETRY");
0460             }
0461         }
0462         else
0463         {
0464             /* reset counter if no retry */
0465             try = 0;
0466         }
0467         
0468     } while (again);
0469     
0470     /* Check are we complete */
0471     if (descr->txstage >=descr->txs_min_complete &&
0472             descr->txstage <=descr->txs_max_complete)
0473     {
0474         NDRX_LOG(log_info, "Transaction completed - remove logs");
0475         
0476         /* p_tl becomes invalid! */
0477         tms_remove_logfile(p_tl, EXTRUE);
0478         
0479         is_tx_finished = EXTRUE;
0480     }
0481     
0482     /* map stage to return code */
0483     ret = xa_txstage2tperrno(descr->txstage, master_op);
0484     
0485 out:          
0486 
0487     /* Bug #199 if system error occurs transaction 
0488      * 
0489      */
0490     if (!is_tx_finished)
0491     {
0492         /* move transaction to background */
0493         if (!p_tl->is_background)
0494         {
0495             NDRX_LOG(log_info, "Transaction not completed - leave "
0496                     "to background");
0497             p_tl->is_background = EXTRUE;
0498         }
0499         else
0500         {
0501             NDRX_LOG(log_info, "Transaction not completed - will be processed with next"
0502                     "background cycle (if not expired)");
0503         }
0504         
0505         /* Unlock the transaction */
0506         tms_unlock_entry(p_tl);
0507     }
0508 
0509     if (NULL!=stagearr.mem)
0510     {
0511         ndrx_growlist_free(&stagearr);
0512     }
0513 
0514     NDRX_LOG(log_info, "tm_drive() returns %d", ret);
0515     return ret;
0516 }
0517 /* vim: set ts=4 sw=4 et smartindent: */