Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Correlation id queue handler
0003  *
0004  * @file corhandle.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <stdarg.h>
0040 
0041 #include <ndebug.h>
0042 #include "qcommon.h"
0043 #include "tmqd.h"
0044 #include <utlist2.h>
0045 #include <rbtree.h>
0046 /*---------------------------Externs------------------------------------*/
0047 /*---------------------------Macros-------------------------------------*/
0048 /*---------------------------Enums--------------------------------------*/
0049 /*---------------------------Typedefs-----------------------------------*/
0050 /*---------------------------Globals------------------------------------*/
0051 /*---------------------------Statics------------------------------------*/
0052 /*---------------------------Prototypes---------------------------------*/
0053 
0054 /**
0055  * Find the correlator entry in the hash
0056  * @param qhash queue entry
0057  * @param corrid_str correlator id 
0058  * @return ptr or NULL (if not found)
0059  */
0060 expublic tmq_corhash_t * tmq_cor_find(tmq_qhash_t *qhash, char *corrid_str)
0061 {
0062     tmq_corhash_t *ret = NULL;
0063 
0064     EXHASH_FIND_STR( (qhash->corhash), corrid_str, ret);
0065 
0066     return ret;
0067 
0068 }
0069 
0070 /**
0071  * Add correlator to the hash
0072  * @param qhash queue entry
0073  * @param corrid_str identifier to add
0074  * @return ptr or NULL (if failed to add)
0075  */
0076 expublic tmq_corhash_t * tmq_cor_add(tmq_qhash_t *qhash, char *corrid_str)
0077 {
0078     /* allocate the handle */
0079     tmq_corhash_t *corhash = NDRX_FPMALLOC(sizeof(tmq_corhash_t), 0);
0080 
0081     if (NULL==corhash)
0082     {
0083         NDRX_LOG(log_error, "Failed to malloc %d bytes: %s",
0084             sizeof(tmq_corhash_t), strerror(errno));
0085         goto out;
0086     }
0087 
0088     /* add stuff to hash: */
0089     memset(corhash, 0, sizeof(tmq_corhash_t));
0090     NDRX_STRCPY_SAFE(corhash->corrid_str, corrid_str);
0091     EXHASH_ADD_STR( qhash->corhash, corrid_str, corhash);
0092 
0093     /* setup red-black trees */
0094     ndrx_rbt_init(&corhash->corq, tmq_rbt_cmp_cor, tmq_rbt_combine_cor, NULL, corhash);
0095 
0096     NDRX_LOG(log_debug, "Added corrid_str [%s] %p",
0097             corhash->corrid_str, corhash);
0098 
0099 out:
0100     return corhash;
0101 }
0102 
0103 /**
0104  * Remove message from correlator hash / linked list
0105  * @param qhash queue entry
0106  * @param mmsg remove from CDL, remove from hash, if CDL is free, remove HASH
0107  *  entry
0108  */
0109 expublic void tmq_cor_msg_del(tmq_memmsg_t *mmsg)
0110 {
0111     /* find the corhash entry, if have one remove from from hash
0112      * remove msg from CDL
0113      */
0114     tmq_corhash_t * corhash = mmsg->corhash;
0115 
0116     /* remove correlator from hash if empty */
0117     ndrx_rbt_delete(&corhash->corq, (ndrx_rbt_node_t *)&mmsg->cor);
0118 
0119     mmsg->corhash = NULL;
0120 
0121     /* if sub-Q is empty, remove correlator */
0122     if (ndrx_rbt_is_empty(&corhash->corq))
0123     {
0124 
0125         NDRX_LOG(log_debug, "Removing corrid_str [%s] %p",
0126             corhash->corrid_str, corhash);
0127 
0128         /* remove empty hash node */
0129         EXHASH_DEL(mmsg->qhash->corhash, corhash);
0130         NDRX_FPFREE(corhash);
0131     }
0132 
0133     return;
0134 }
0135 
0136 /**
0137  * Add message to corelator hash / linked list
0138  * @param mmsg message to add
0139  * @return Qerror code 
0140  */
0141 expublic int tmq_cor_msg_add(tmq_memmsg_t *mmsg)
0142 {
0143     int ret = EXSUCCEED;
0144     int isNew = EXFALSE;
0145     tmq_corhash_t * corhash;
0146 
0147     corhash = tmq_cor_find(mmsg->qhash, mmsg->corrid_str);
0148     
0149     if (NULL==corhash)
0150     {
0151         corhash=tmq_cor_add(mmsg->qhash, mmsg->corrid_str);
0152     }
0153     
0154     if (NULL==corhash)
0155     {
0156         ret = QMEOS;
0157         goto out;
0158     }
0159     
0160     /* add backref */
0161     mmsg->corhash = corhash;
0162 
0163     ndrx_rbt_insert(&corhash->corq, (ndrx_rbt_node_t *)&mmsg->cor, &isNew);
0164     
0165 out:
0166     return ret;
0167 }
0168 
0169 /* vim: set ts=4 sw=4 et smartindent: */