Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Red-Black tree support routines for future Q, cur Q, cor Q
0003  *  Note that delete is not implemented for trees, as tmq_memmsg_t is deleted by
0004  *  parent processes.
0005  *
0006  * @file rbtsupp.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <assert.h>
0041 
0042 #include <ndebug.h>
0043 #include <exhash.h>
0044 #include <atmi.h>
0045 
0046 #include "tmqd.h"
0047 #include <utlist.h>
0048 
0049 /*---------------------------Externs------------------------------------*/
0050 /*---------------------------Macros-------------------------------------*/
0051 /*---------------------------Enums--------------------------------------*/
0052 /*---------------------------Typedefs-----------------------------------*/
0053 /*---------------------------Globals------------------------------------*/
0054 /*---------------------------Statics------------------------------------*/
0055 /*---------------------------Prototypes---------------------------------*/
0056 
0057 
0058 /**
0059  * Found duplicate entry in current
0060  * print error and abort
0061  */
0062 expublic void tmq_rbt_combine_cur(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0063 {
0064     tmq_memmsg_t *existing_msg = (tmq_memmsg_t *)existing;
0065     tmq_memmsg_t *newdata_msg = (tmq_memmsg_t *)newdata;
0066     char tmp[256];
0067 
0068     /* NOTE: Clock skew might happen, time is moved backwards and we get duplicate sequeuences. */
0069     snprintf(tmp, sizeof(tmp), "ERROR ! (cur) Two messages with the same key in current Q! "
0070             "Existing msgid [%s] new msgid [%s] dup key: msgtstamp=%ld, "
0071             "msgtstamp_usec=%ld, msgtstamp_cntr=%d. Clock skew?",
0072             existing_msg->msgid_str, newdata_msg->msgid_str, existing_msg->msg->msgtstamp, 
0073             existing_msg->msg->msgtstamp_usec, 
0074             existing_msg->msg->msgtstamp_cntr);
0075     NDRX_LOG(log_error, "%s", tmp);
0076     userlog("%s", tmp);
0077     abort();
0078 }
0079 
0080 /**
0081  * Found duplicate entry in correlator
0082  * print error and abort
0083  */
0084 expublic void tmq_rbt_combine_fut(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0085 {
0086     tmq_memmsg_t *existing_msg = (tmq_memmsg_t *)existing;
0087     tmq_memmsg_t *newdata_msg = (tmq_memmsg_t *)newdata;
0088     char tmp[256];
0089     snprintf(tmp, sizeof(tmp), "ERROR ! (fut) Two messages with the same key in future Q! "
0090             "Existing msgid [%s] new msgid [%s] dup key: deq_time=%ld, msgtstamp=%ld, "
0091             "msgtstamp_usec=%ld, msgtstamp_cntr=%d. Clock skew?",
0092             existing_msg->msgid_str, newdata_msg->msgid_str, 
0093             existing_msg->msg->qctl.deq_time,
0094             existing_msg->msg->msgtstamp, 
0095             existing_msg->msg->msgtstamp_usec, 
0096             existing_msg->msg->msgtstamp_cntr);
0097     NDRX_LOG(log_error, "%s", tmp);
0098     userlog("%s", tmp);
0099     abort();
0100 }
0101 
0102 /**
0103  * Got duplicate messages in correlation Q
0104  */
0105 expublic void tmq_rbt_combine_cor(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0106 {
0107     tmq_memmsg_t *existing_msg = TMQ_COR_GETMSG(existing);
0108     tmq_memmsg_t *newdata_msg = TMQ_COR_GETMSG(newdata);
0109     char tmp[256];
0110     snprintf(tmp, sizeof(tmp), "ERROR ! (cor) Two messages with the same key in correlation Q! "
0111             "Existing msgid [%s] new msgid [%s] dup key: msgtstamp=%ld, msgtstamp_usec=%ld, "
0112             "msgtstamp_cntr=%d. Clock skew?",
0113             existing_msg->msgid_str, newdata_msg->msgid_str, existing_msg->msg->msgtstamp, 
0114             existing_msg->msg->msgtstamp_usec, 
0115             existing_msg->msg->msgtstamp_cntr);
0116     NDRX_LOG(log_error, "%s", tmp);
0117     userlog("%s", tmp);
0118     abort();
0119 }
0120 
0121 /**
0122  * Compare current Q. This is based 
0123  */
0124 expublic int tmq_rbt_cmp_cur(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0125 {
0126     tmq_memmsg_t *aa = (tmq_memmsg_t *)a;
0127     tmq_memmsg_t *bb = (tmq_memmsg_t *)b;
0128 
0129     return ndrx_compare3(aa->msg->msgtstamp, aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0130             bb->msg->msgtstamp, bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0131 }
0132 
0133 /**
0134  * Compare elements from the correlator Q
0135  */
0136 expublic int tmq_rbt_cmp_cor(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0137 {
0138     tmq_memmsg_t *aa = TMQ_COR_GETMSG(a);
0139     tmq_memmsg_t *bb = TMQ_COR_GETMSG(b);
0140 
0141     return ndrx_compare3(aa->msg->msgtstamp, aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0142             bb->msg->msgtstamp, bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0143 }
0144 
0145 /**
0146  * Compare elements of the future Q.
0147  * If doing the re-scheduling of the failed forward/automatic message, will use
0148  * the same deq_time to schedule the next attempt for the message.
0149  */
0150 expublic int tmq_rbt_cmp_fut (const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0151 {
0152     /* future Q compare */
0153     tmq_memmsg_t *aa = (tmq_memmsg_t *)a;
0154     tmq_memmsg_t *bb = (tmq_memmsg_t *)b;
0155 
0156     return ndrx_compare4(aa->msg->qctl.deq_time, aa->msg->msgtstamp, 
0157                          aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0158                          bb->msg->qctl.deq_time, bb->msg->msgtstamp, 
0159                          bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0160 }
0161 
0162 
0163 /* vim: set ts=4 sw=4 et smartindent: */