Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Infligth support routines, to manage inflight messages
0003  *
0004  * @file inflight.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <assert.h>
0039 
0040 #include <ndebug.h>
0041 #include <exhash.h>
0042 #include <atmi.h>
0043 
0044 #include "tmqd.h"
0045 #include <utlist.h>
0046 #include <rbtree.h>
0047 /*---------------------------Externs------------------------------------*/
0048 /*---------------------------Macros-------------------------------------*/
0049 /*---------------------------Enums--------------------------------------*/
0050 /*---------------------------Typedefs-----------------------------------*/
0051 /*---------------------------Globals------------------------------------*/
0052 /*---------------------------Statics------------------------------------*/
0053 /*---------------------------Prototypes---------------------------------*/
0054 
0055 /**
0056  * @brief Add message to inflight or (cur + cor )/fut (depending on p_msg->flags)
0057  *
0058  * @param qconf queue config
0059  * @param qhash queue hash
0060  * @param p_msg message to add
0061  *
0062  * @return NOTE! migth return error code
0063  */
0064 expublic int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
0065 {
0066     int ret = EXSUCCEED;
0067     int isNew = EXFALSE;
0068     char corrid_str[TMCORRIDLEN_STR+1];
0069 
0070     mmsg->qconf=qconf;
0071     mmsg->qhash=qhash;
0072 
0073     if (mmsg->msg->lockthreadid)
0074     {
0075         CDL_APPEND(qhash->q_infligh, mmsg);
0076         mmsg->qstate = NDRX_TMQ_LOC_INFL;
0077     }
0078     else if ( (mmsg->msg->qctl.flags & TPQTIME_ABS) && 
0079             (mmsg->msg->qctl.deq_time > (long)time(NULL)))
0080     {
0081         ndrx_rbt_insert(&qhash->q_fut, (ndrx_rbt_node_t *)mmsg, &isNew);
0082         mmsg->qstate = NDRX_TMQ_LOC_FUTQ;
0083     }
0084     else
0085     {
0086         /* insert to cur */
0087         ndrx_rbt_insert(&qhash->q, (ndrx_rbt_node_t *)mmsg, &isNew);
0088         mmsg->qstate = NDRX_TMQ_LOC_CURQ;
0089 
0090         if (mmsg->msg->qctl.flags & TPQCORRID)
0091         {
0092             if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0093             {
0094                 NDRX_LOG(log_error, "Failed to add msg to corhash!");
0095                 EXFAIL_OUT(ret);
0096             }
0097             mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0098         }
0099     }
0100     /* add message to G_msgid_hash */
0101 
0102     /*  mmsg->msgid_str must be setup too! */
0103     EXHASH_ADD_STR( G_msgid_hash, msgid_str, mmsg);
0104     mmsg->qstate |= NDRX_TMQ_LOC_MSGIDHASH;
0105 
0106 out:
0107     return ret;
0108 }
0109 
0110 /**
0111  * Move message from current/cur/fut to inflight
0112  * NOTE that context must be with M_q_lock locked.
0113  */
0114 expublic int ndrx_infl_mov2infl(tmq_memmsg_t *mmsg)
0115 {
0116     int ret = EXSUCCEED;
0117 
0118     if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
0119     {
0120         ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0121         mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;
0122     }
0123     else if (mmsg->qstate & NDRX_TMQ_LOC_CURQ)
0124     {
0125         ndrx_rbt_delete(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg);
0126         mmsg->qstate &= ~NDRX_TMQ_LOC_CURQ;
0127 
0128         /* remove from correlator too */
0129         if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
0130         {
0131             tmq_cor_msg_del(mmsg);
0132             mmsg->qstate &= ~NDRX_TMQ_LOC_CORQ;
0133         }
0134     }
0135     else
0136     {
0137         NDRX_LOG(log_error, "Message [%s] is not in cur/fut queue!",
0138                 mmsg->msgid_str);
0139         userlog("Cannot move to inflight Q: messsage [%s] is not in cur/fut queue (state=%hd)!",
0140                 mmsg->msgid_str, mmsg->qstate);
0141         /*EXFAIL_OUT(ret);*/
0142         abort();
0143     }
0144 
0145     /* add message to inflight */
0146     CDL_APPEND(mmsg->qhash->q_infligh, mmsg);
0147     mmsg->qstate |= NDRX_TMQ_LOC_INFL;
0148 
0149 out:
0150     return ret;
0151 }
0152 
0153 /**
0154  * Move message from inflight to current/cur/fut
0155  * NOTE that context must be with M_q_lock locked.
0156  * @param qconf queue config
0157  * @param qhash queue hash
0158  * @param mmsg message to move
0159  * @return EXSUCCEED/EXFAIL 
0160  */
0161 expublic int ndrx_infl_mov2cur(tmq_memmsg_t *mmsg)
0162 {
0163     int ret = EXSUCCEED;
0164 
0165     if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
0166     {
0167         CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
0168         mmsg->qstate &= ~NDRX_TMQ_LOC_INFL;
0169 
0170         /* enqueue to cur/cor or fut */
0171         if ( (mmsg->msg->qctl.flags & TPQTIME_ABS) && 
0172                 (mmsg->msg->qctl.deq_time > (long)time(NULL)))
0173         {
0174             ndrx_rbt_insert(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg, NULL);
0175             mmsg->qstate |= NDRX_TMQ_LOC_FUTQ;
0176         }
0177         else
0178         {
0179             /* insert to cur */
0180             ndrx_rbt_insert(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
0181             mmsg->qstate |= NDRX_TMQ_LOC_CURQ;
0182 
0183             if (mmsg->msg->qctl.flags & TPQCORRID)
0184             {
0185                 if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0186                 {
0187                     NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
0188                             mmsg->msgid_str);
0189                     EXFAIL_OUT(ret);
0190                 }
0191                 mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0192             }
0193         }
0194     }
0195     else
0196     {
0197         NDRX_LOG(log_error, "Message [%s] is not in inflight queue!",
0198                 mmsg->msgid_str);
0199         userlog("Cannot move to cur Q: messsage [%s] is not in inflight queue (state=%hd)!",
0200                 mmsg->msgid_str, mmsg->qstate);
0201         abort();
0202     }
0203 
0204 out:
0205     return ret;
0206 }
0207 
0208 
0209 /**
0210  * Remove message from Qs
0211  * @param qhash queue hash
0212  * @param mmsg message to remove
0213  * NOTE that context must be with M_q_lock locked.
0214  */
0215 expublic int ndrx_infl_delmsg(tmq_memmsg_t *mmsg)
0216 {
0217     int ret = EXSUCCEED;
0218 
0219     if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
0220     {
0221         /* remove from correlator Q */
0222         tmq_cor_msg_del(mmsg);
0223     }
0224 
0225     if (mmsg->qstate & NDRX_TMQ_LOC_MSGIDHASH)
0226     {
0227         EXHASH_DEL(G_msgid_hash, mmsg);
0228     }
0229 
0230     if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
0231     {
0232         CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
0233     }
0234 
0235     if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
0236     {
0237         ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0238     }
0239 
0240     if (mmsg->qstate & NDRX_TMQ_LOC_CURQ)
0241     {
0242         /* really shall not happenn.... firstly it shall go to inflight... */
0243         ndrx_rbt_delete(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg);
0244     }
0245 
0246     mmsg->qstate=0;
0247 
0248     return ret;
0249 }
0250 
0251 /**
0252  * Move message from future to cur or/and cor
0253  * @param qhash queue hash
0254  */
0255 expublic int ndrx_infl_fut2cur(tmq_qhash_t *qhash)
0256 {
0257     int ret = EXSUCCEED;
0258     tmq_memmsg_t *mmsg = NULL;
0259     ndrx_rbt_tree_iterator_t iter;
0260 
0261     while (NULL != (mmsg = (tmq_memmsg_t*)ndrx_rbt_leftmost(&qhash->q_fut)) )
0262     {
0263         /* do something with mmsg */
0264         if ( mmsg->msg->qctl.deq_time <= (long)time(NULL) )
0265         {
0266             /* remove from future */
0267             ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0268             mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;
0269 
0270             /* insert to cur */
0271             ndrx_rbt_insert(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
0272             mmsg->qstate |= NDRX_TMQ_LOC_CURQ;
0273 
0274             /* do the correlator too... if needed */
0275             if (mmsg->msg->qctl.flags & TPQCORRID)
0276             {
0277                 /* insert to cor */
0278                 if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0279                 {
0280                     NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
0281                             mmsg->msgid_str);
0282                     EXFAIL_OUT(ret);
0283                 }
0284                 mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0285             }
0286         }
0287         else
0288         {
0289             break;
0290         }
0291     }
0292 
0293 out:
0294     return ret;
0295 }
0296 
0297 /* vim: set ts=4 sw=4 et smartindent: */