Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief keep the track of tmqueue transactions
0003  *  Needs transaction id, status (active, (preparing?), aborting, prepared)
0004  *  Needs counter for max file name, someting like request next...?
0005  *  Needs time setting + timeout, for automatic rollback.
0006  *
0007  * @file qtran.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043 #include <stdarg.h>
0044 
0045 #include <ndebug.h>
0046 #include <atmi.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <nstdutil.h>
0053 
0054 #include "userlog.h"
0055 #include <xa_cmn.h>
0056 #include <exhash.h>
0057 #include <unistd.h>
0058 #include <Exfields.h>
0059 #include "qtran.h"
0060 #include "qcommon.h"
0061 /*---------------------------Externs------------------------------------*/
0062 /*---------------------------Macros-------------------------------------*/
0063 #define CHK_THREAD_ACCESS if (ndrx_gettid()!=p_tl->lockthreadid)\
0064     {\
0065         NDRX_LOG(log_error, "Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0066                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0067         userlog("Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0068                 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0069         return EXFAIL;\
0070     }
0071 /*---------------------------Enums--------------------------------------*/
0072 /*---------------------------Typedefs-----------------------------------*/
0073 /*---------------------------Globals------------------------------------*/
0074 exprivate qtran_log_t *M_qtran_hash = NULL;    /**< Hash for transactions  */
0075 exprivate MUTEX_LOCKDECL(M_qtran_hash_lock);   /**< Transaction hash lock  */
0076 
0077 /*---------------------------Statics------------------------------------*/
0078 /*---------------------------Prototypes---------------------------------*/
0079 
0080 /**
0081  * Get next sequence number
0082  * @param p_tl transaction (must be locked)
0083  * @return next seq no 
0084  */
0085 expublic int tmq_log_next(qtran_log_t *p_tl)
0086 {
0087     p_tl->seqno++;
0088     
0089     return p_tl->seqno;
0090 }
0091 
0092 /**
0093  * Unlock transaction
0094  * @param p_tl
0095  * @return SUCCEED/FAIL
0096  */
0097 expublic int tmq_log_unlock(qtran_log_t *p_tl)
0098 {
0099     CHK_THREAD_ACCESS;
0100     
0101     NDRX_LOG(log_info, "Transaction [%s] unlocked by thread %" PRIu64, p_tl->tmxid,
0102             p_tl->lockthreadid);
0103     
0104     MUTEX_LOCK_V(M_qtran_hash_lock);
0105     p_tl->lockthreadid = 0;
0106     MUTEX_UNLOCK_V(M_qtran_hash_lock);
0107     
0108     return EXSUCCEED;
0109 }
0110 
0111 
0112 /**
0113  * Abort all active transactions
0114  * this assumes that other threads is not doing anything with the hash list
0115  * so basically this must be executed by single thread at the startup of
0116  * the tmqueue.
0117  * @return EXSUCCEED/EXFAIL
0118  */
0119 expublic int tmq_log_abortall(void)
0120 {
0121     qtran_log_t *el, *elt;
0122     int ret = EXSUCCEED;
0123     XID xid;
0124     
0125     EXHASH_ITER(hh, M_qtran_hash, el, elt)
0126     {
0127         if (el->is_abort_only)
0128         {
0129             NDRX_LOG(log_error, "Aborting active transaction tmxid [%s]", el->tmxid);
0130         
0131             if (NULL==atmi_xa_deserialize_xid((unsigned char *)el->tmxid, &xid))
0132             {
0133                 NDRX_LOG(log_error, "Failed to deserialize tmxid [%s]", el->tmxid);
0134                 EXFAIL_OUT(ret);
0135             }
0136 
0137             /* try to rollback the stuff...! */
0138             if (EXSUCCEED!=atmi_xa_rollback_entry(&xid, 0))
0139             {
0140                 NDRX_LOG(log_error, "Failed to abort [%s]", el->tmxid);
0141                 EXFAIL_OUT(ret);
0142             }
0143         }
0144     }
0145 out:
0146     return ret;
0147     
0148 }
0149 
0150 
0151 /**
0152  * Check that log entry exists (lock or not, does not matter)
0153  * @return EXFAIL/EXFALSE/EXTRUE
0154  */
0155 expublic int tmq_log_exists_entry(char *tmxid)
0156 {
0157     int ret = EXFALSE;
0158     qtran_log_t *r = NULL;
0159 
0160     MUTEX_LOCK_V(M_qtran_hash_lock);
0161     EXHASH_FIND_STR( M_qtran_hash, tmxid, r);
0162     MUTEX_UNLOCK_V(M_qtran_hash_lock);
0163 
0164     if (NULL!=r)
0165     {
0166         ret = EXTRUE;
0167     }
0168 
0169     return ret;
0170 }
0171 
0172 /**
0173  * Get the log entry of the transaction
0174  * Now we should lock it for thread.
0175  * TODO: Add option for wait on lock.
0176  * This would be needed for cases for example when Postgres may report
0177  * the status in background, and for some reason TMSRV already processes the
0178  * transaction (ether abort, or new register branch, etc...) and some stalled
0179  * PG process wants to finish the work off. Thus we need to waited lock for
0180  * foreground operations.
0181  * TODO: if log is locked by the same thread, then set the locke, but continue.
0182  *  As there might be re-cursive usage.
0183  * @param tmxid - serialized XID
0184  * @param[in] dowait milliseconds to wait for lock, before give up
0185  * @param[out] locke lock error
0186  * @return NULL or log entry
0187  */
0188 expublic qtran_log_t * tmq_log_get_entry(char *tmxid, int dowait, int *locke)
0189 {
0190     qtran_log_t *r = NULL;
0191     ndrx_stopwatch_t w;
0192     
0193     if (dowait)
0194     {
0195         ndrx_stopwatch_reset(&w);
0196     }
0197     
0198     if (NULL!=locke)
0199     {
0200         *locke=EXFALSE;
0201     }
0202     
0203 restart:
0204     MUTEX_LOCK_V(M_qtran_hash_lock);
0205     EXHASH_FIND_STR( M_qtran_hash, tmxid, r);
0206     
0207     if (NULL!=r)
0208     {
0209         if (r->lockthreadid && r->lockthreadid!=ndrx_gettid())
0210         {
0211             if (dowait && ndrx_stopwatch_get_delta(&w) < dowait)
0212             {
0213                 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0214                 /* sleep 100 msec */
0215                 usleep(100000);
0216                 goto restart;
0217                 
0218             }
0219             
0220             NDRX_LOG(log_error, "Q Transaction [%s] already locked for thread_id: %"
0221                     PRIu64 " lock time: %d msec",
0222                     tmxid, r->lockthreadid, dowait);
0223             
0224             userlog("tmqueue: Transaction [%s] already locked for thread_id: %" PRIu64
0225                     "lock time: %d msec",
0226                     tmxid, r->lockthreadid, dowait);
0227             r = NULL;
0228             
0229             /* cannot get lock */
0230             if (NULL!=locke)
0231             {
0232                 *locke=EXTRUE;
0233             }
0234             
0235         }
0236         else if (r->lockthreadid)
0237         {
0238             NDRX_LOG(log_info, "Transaction [%s] sub-locked for thread_id: %" PRIu64,
0239                     tmxid, r->lockthreadid);
0240             
0241             /* give hint that no unlocking shall be done
0242              * as already locked.
0243              */
0244             if (NULL!=locke)
0245             {
0246                 *locke=EXTRUE;
0247             }
0248         }
0249         else
0250         {
0251             r->lockthreadid = ndrx_gettid();
0252             NDRX_LOG(log_debug, "Transaction [%s] locked for thread_id: %" PRIu64,
0253                     tmxid, r->lockthreadid);
0254         }
0255     }
0256     
0257     MUTEX_UNLOCK_V(M_qtran_hash_lock);
0258     
0259     return r;
0260 }
0261 
0262 /**
0263  * Start the log entry.
0264  * Log shall be started from xa_start() only when not perform join.
0265  * If performing join, and if we are tmqueue, then check the transaction log
0266  * for entry.
0267  * @return SUCCEED/FAIL
0268  */
0269 expublic int tmq_log_start(char *tmxid)
0270 {
0271     int ret = EXSUCCEED;
0272     int hash_added = EXFALSE;
0273     qtran_log_t *tmp = NULL;
0274     
0275     /* 1. Add stuff to hash list */
0276     if (NULL==(tmp = NDRX_FPMALLOC(sizeof(qtran_log_t), 0)))
0277     {
0278         NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0279         ret=EXFAIL;
0280         goto out;
0281     }
0282     
0283     /* reset the log */
0284     memset(tmp, 0, sizeof(*tmp));
0285     
0286     tmp->txstage = XA_TX_STAGE_ACTIVE;
0287     tmp->t_start = ndrx_utc_tstamp();
0288     tmp->t_update = ndrx_utc_tstamp();
0289     NDRX_STRCPY_SAFE(tmp->tmxid, tmxid);
0290     ndrx_stopwatch_reset(&tmp->ttimer);
0291 
0292     /* get cluster sequence */
0293     tmq_log_setseq(tmp);
0294     
0295     /* TODO: write initial tran-info message...
0296      * if we add initial message, we can overwrite this active file
0297      * with actual msg contents. As in case if we have read-only branch
0298      * we need some infos that transaction actually exists.
0299      * But to avoid more moves + fsync, we will re-write this file.
0300      */
0301     
0302     /* lock for us, yet it is not shared*/
0303     tmp->lockthreadid = ndrx_gettid();
0304     
0305     MUTEX_LOCK_V(M_qtran_hash_lock);
0306     EXHASH_ADD_STR( M_qtran_hash, tmxid, tmp);
0307     MUTEX_UNLOCK_V(M_qtran_hash_lock);
0308     
0309     hash_added = EXTRUE;
0310     
0311 out:
0312     
0313     /* unlock */
0314     if (EXSUCCEED==ret && NULL!=tmp)
0315     {
0316         tmq_log_unlock(tmp);
0317     }
0318 
0319     return ret;
0320 }
0321 
0322 /**
0323  * Get transaction existing or create new
0324  * @param tmxid transaction id 
0325  * @return transaction log entry or NULL on error
0326  */
0327 expublic qtran_log_t * tmq_log_start_or_get(char *tmxid)
0328 {
0329     int locke;
0330     
0331     qtran_log_t * ret = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0332     
0333     if (NULL==ret)
0334     {
0335         if (locke)
0336         {
0337             ret=NULL;
0338         }
0339         else if (EXSUCCEED!=tmq_log_start(tmxid))
0340         {
0341             ret=NULL;
0342         }
0343         else
0344         {
0345             ret = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0346         }
0347     }
0348     
0349     return ret;
0350 }
0351 
0352 /**
0353  * mark transaction as abort only
0354  * @param tmxid
0355  */
0356 expublic void tmq_log_set_abort_only(char *tmxid)
0357 {
0358     int locke=EXFALSE;
0359     qtran_log_t * p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0360     
0361     if (NULL!=p_tl)
0362     {
0363         NDRX_LOG(log_error, "Marking [%s] Q tran as abort only", tmxid);
0364         p_tl->is_abort_only=EXTRUE;
0365     }
0366     
0367     if (NULL!=p_tl && !locke)
0368     {
0369         /* unlock */
0370         tmq_log_unlock(p_tl);
0371     }
0372 }
0373 
0374 /**
0375  * Add command to the log
0376  * @param tmxid transaction id (serialized)
0377  * @param seqno command sequence number
0378  * @param b command block, convert to char *block, so that we detect the
0379  *  type and length here internally. With having base block set to zeros.
0380  * @param bsz block size
0381  * @param entry_status status according to XA_RM_STATUS* consts
0382  * @return EXSUCCEED/EXFAIL
0383  */
0384 expublic int tmq_log_addcmd(char *tmxid, int seqno, char *b, char entry_status)
0385 {
0386     int ret = EXSUCCEED;
0387     qtran_log_t *p_tl= NULL;
0388     qtran_log_cmd_t *cmd=NULL;
0389     size_t len;
0390     int locke;
0391     tmq_cmdheader_t *p_hdr=(tmq_cmdheader_t *)b;
0392     
0393     NDRX_LOG(log_info, "Adding Q tran cmd: [%s] seqno: %d, "
0394             "command_code: %c, status: %c",
0395             tmxid, seqno, p_hdr->command_code, entry_status);
0396     
0397     if (NULL==(p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke)))
0398     {
0399         NDRX_LOG(log_error, "No Q transaction/lock timeout under xid_str: [%s]", 
0400                 tmxid);
0401         ret=EXFAIL;
0402         goto out_nolock;
0403     }
0404     
0405     /* Alloc new command block */
0406     if (NULL==(cmd = NDRX_FPMALLOC(sizeof(qtran_log_cmd_t), 0)))
0407     {
0408         NDRX_LOG(log_error, "Failed to fpmalloc %d bytes: %s", 
0409                 sizeof(qtran_log_cmd_t), strerror(errno));
0410         userlog("Failed to fpmalloc %d bytes: %s", 
0411                 sizeof(qtran_log_cmd_t), strerror(errno));
0412         EXFAIL_OUT(ret);
0413     }
0414 
0415     /* Update session timeout... */
0416     ndrx_stopwatch_reset(&p_tl->ttimer);
0417     
0418     memset(cmd, 0, sizeof(*cmd));
0419     
0420     cmd->seqno=seqno;
0421     cmd->cmd_status = entry_status;
0422     cmd->command_code = p_hdr->command_code;
0423     
0424     /* in case if doing recovery... */
0425     if (p_tl->seqno<seqno)
0426     {
0427         p_tl->seqno=seqno;
0428     }
0429     
0430     /* select the size of update block
0431      * non init bytes after the struct in cases on non upd
0432      * does not matter
0433      */
0434     if (TMQ_STORCMD_UPD==p_hdr->command_code)
0435     {
0436         len = sizeof(tmq_msg_upd_t);
0437     }
0438     else
0439     {
0440         len = sizeof(tmq_cmdheader_t);
0441     }
0442     
0443     /* store the update block */
0444     memcpy(&cmd->b, b, len);
0445     
0446     DL_APPEND(p_tl->cmds, cmd);
0447    
0448 out:
0449     /* unlock transaction from thread */
0450     if (NULL!=p_tl && !locke)
0451     {
0452         tmq_log_unlock(p_tl);
0453     }
0454 
0455 out_nolock:
0456     
0457     return ret;
0458 }
0459 
0460 /**
0461  * Free up log file (just memory)
0462  * @param p_tl log handle
0463  * @param hash_rm remove log entry from tran hash
0464  */
0465 expublic void tmq_remove_logfree(qtran_log_t *p_tl, int hash_rm)
0466 {
0467     if (hash_rm)
0468     {
0469         MUTEX_LOCK_V(M_qtran_hash_lock);
0470         EXHASH_DEL(M_qtran_hash, p_tl); 
0471         MUTEX_UNLOCK_V(M_qtran_hash_lock);
0472     }
0473     
0474     NDRX_FPFREE(p_tl);
0475 }
0476 
0477 /**
0478  * Copy the background items to the linked list.
0479  * The idea is that this is processed by background. During that time, it does not
0480  * remove any items from hash. Thus pointers should be still valid. 
0481  * TODO: We should copy here transaction info too....
0482  * 
0483  * @param p_tl
0484  * @return 
0485  */
0486 expublic qtran_log_list_t* tmq_copy_hash2list(int copy_mode)
0487 {
0488     qtran_log_list_t *ret = NULL;
0489     qtran_log_t * r, *rt;
0490     qtran_log_list_t *tmp;
0491     
0492     if (copy_mode & COPY_MODE_ACQLOCK)
0493     {
0494         MUTEX_LOCK_V(M_qtran_hash_lock);
0495     }
0496     
0497     /* No changes to hash list during the lock. */    
0498     
0499     EXHASH_ITER(hh, M_qtran_hash, r, rt)
0500     {
0501         /* Only background items... */
0502         if (r->is_background && !(copy_mode & COPY_MODE_BACKGROUND))
0503             continue;
0504         
0505         if (!r->is_background && !(copy_mode & COPY_MODE_FOREGROUND))
0506             continue;
0507                 
0508         if (NULL==(tmp = NDRX_FPMALLOC(sizeof(qtran_log_list_t), 0)))
0509         {
0510             NDRX_LOG(log_error, "Failed to fpmalloc %d: %s", 
0511                     sizeof(qtran_log_list_t), strerror(errno));
0512             goto out;
0513         }
0514         
0515         /* we should copy full TL structure, because some other thread might
0516          * will use it.
0517          * Having some invalid pointers inside does not worry us, because
0518          * we just need a list for a printing or xids for background txn lookup
0519          */
0520         memcpy(&tmp->p_tl, r, sizeof(*r));
0521         
0522         LL_APPEND(ret, tmp);
0523     }
0524     
0525 out:
0526     if (copy_mode & COPY_MODE_ACQLOCK)
0527     {
0528         MUTEX_UNLOCK_V(M_qtran_hash_lock);
0529     }
0530 
0531     return ret;
0532 }
0533 
0534 /**
0535  * Lock the transaction log hash
0536  */
0537 expublic void tmq_tx_hash_lock(void)
0538 {
0539     MUTEX_LOCK_V(M_qtran_hash_lock);
0540 }
0541 
0542 /**
0543  * Unlock the transaction log hash
0544  */
0545 expublic void tmq_tx_hash_unlock(void)
0546 {
0547     MUTEX_UNLOCK_V(M_qtran_hash_lock);
0548 }
0549 
0550 
0551 /**
0552  * set current lock sequence number
0553  * @param p_tl log entry
0554  */
0555 expublic int tmq_log_setseq(qtran_log_t *p_tl)
0556 {
0557     int ret= EXSUCCEED;
0558     long grp_flags=0;
0559     /* if we are in singleton group mode, validate that we still
0560      * own the lock
0561      */
0562     if (G_atmi_env.procgrp_no)
0563     {
0564         p_tl->sg_sequence=tpsgislocked(G_atmi_env.procgrp_no
0565             , TPPG_SGVERIFY|TPPG_NONSGSUCC
0566             , &grp_flags);
0567 
0568         if (EXFAIL==p_tl->sg_sequence)
0569         {
0570             NDRX_LOG(log_error, "tpsgislocked failed %s", tpstrerror(tperrno));
0571             EXFAIL_OUT(ret);
0572         }
0573 
0574         if (grp_flags & TPPG_SINGLETON && p_tl->sg_sequence<=0)
0575         {
0576             NDRX_LOG(log_error, "Singleton group %d lock lost (at start) - exit(-1)",
0577                 G_atmi_env.procgrp_no);
0578             userlog("Singleton group %d lock lost (at start) - exit(-1)",
0579                 G_atmi_env.procgrp_no);
0580             /* !!!! */
0581             exit(EXFAIL);
0582         }
0583     }
0584 
0585 out:
0586     return ret;
0587 }
0588 
0589 
0590 /**
0591  * Check current sequence (are we still locked on?)
0592  */
0593 expublic int tmq_log_checkpointseq(qtran_log_t *p_tl)
0594 {
0595     int ret=EXSUCCEED;
0596     long seq;
0597     long grp_flags=0;
0598     /* if we are in singleton group mode, validate that we still
0599      * own the lock
0600      */
0601     if (G_atmi_env.procgrp_no)
0602     {
0603         seq=tpsgislocked(G_atmi_env.procgrp_no, TPPG_SGVERIFY|TPPG_NONSGSUCC, &grp_flags);
0604 
0605         if (seq < 0)
0606         {
0607             NDRX_LOG(log_error, "tpsgislocked returns %s", tpstrerror(tperrno));
0608             EXFAIL_OUT(ret);
0609         }
0610 
0611         if ((grp_flags & TPPG_SINGLETON) && 0==seq)
0612         {
0613             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost - exit(-1)",
0614                 G_atmi_env.procgrp_no, tpgetnodeid());
0615             userlog("Singleton group %d on node %ld lock lost - exit(-1)",
0616                 G_atmi_env.procgrp_no, tpgetnodeid());
0617             /* !!!! */
0618             exit(EXFAIL);
0619         }
0620 
0621         /* failover has happened during transaction processing
0622          * thus cannot proceed with decsion, only after restart
0623          */
0624         if ( (grp_flags & TPPG_SINGLETON) && (seq - p_tl->sg_sequence >= G_atmi_env.sglockinc))
0625         {
0626             NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0627                 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0628             userlog("Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0629                 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0630             /* !!!! */
0631             exit(EXFAIL);
0632         }
0633 
0634         /* we are safe to continue... */
0635         p_tl->sg_sequence=seq;
0636     }
0637 out:
0638     return ret;
0639 }
0640 
0641 
0642 /* vim: set ts=4 sw=4 et smartindent: */
0643