Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Forward statistics used for QoS
0003  *
0004  * @file fwdstat.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <assert.h>
0039 
0040 #include <ndebug.h>
0041 #include <exhash.h>
0042 #include <atmi.h>
0043 
0044 #include "tmqd.h"
0045 #include <utlist.h>
0046 
0047 /*---------------------------Externs------------------------------------*/
0048 /*---------------------------Macros-------------------------------------*/
0049 /*---------------------------Enums--------------------------------------*/
0050 /*---------------------------Typedefs-----------------------------------*/
0051 
0052 /*---------------------------Globals------------------------------------*/
0053 /*---------------------------Statics------------------------------------*/
0054 
0055 /** Lock for statistics hash */
0056 exprivate MUTEX_LOCKDECL(M_statsh_lock);
0057 
0058 /** statistics hash by it self */
0059 exprivate fwd_stats_t *M_statsh = NULL;
0060 
0061 /*---------------------------Prototypes---------------------------------*/
0062 
0063 /**
0064  * Init the statistics engine
0065  */
0066 expublic int tmq_fwd_stat_init(void)
0067 {   
0068     return EXSUCCEED;
0069 }
0070 
0071 /**
0072  * Get forward busy count per queue
0073  * Allocate the handle here, if missing..
0074  * @param[in] qname queue name
0075  * @param[out] p_stats stats pointer out
0076  * @return Number of messages in forward threads
0077  */
0078 expublic int tmq_fwd_busy_cnt(char *qname, fwd_stats_t **p_stats)
0079 {
0080     int ret;
0081     fwd_stats_t *el = NULL;
0082     
0083     /* Have mutex sync, as wakeup-notify threads will lookup the queues
0084      * here
0085      */
0086     MUTEX_LOCK_V(M_statsh_lock);
0087     
0088     EXHASH_FIND_STR( M_statsh, qname, el);
0089     
0090     if (NULL==el)
0091     {
0092         el = NDRX_FPMALLOC(sizeof(fwd_stats_t), 0);
0093 
0094         if (NULL==el)
0095         {
0096             NDRX_LOG(log_error, "Failed to malloc %d bytes", sizeof(fwd_stats_t));
0097             EXFAIL_OUT(ret);
0098         }
0099 
0100         NDRX_STRCPY_SAFE(el->qname, qname);
0101         el->busy=0;
0102         el->sync_head = NULL;
0103         
0104         pthread_cond_init(&el->sync_cond, NULL);
0105         NDRX_SPIN_INIT_V(el->busy_spin);
0106         NDRX_SPIN_INIT_V(el->sync_spin);
0107         MUTEX_VAR_INIT(el->sync_mut);
0108         
0109         EXHASH_ADD_STR(M_statsh, qname, el);
0110     }
0111     
0112     ret=el->busy;
0113     *p_stats = el;
0114     
0115 out:
0116     
0117     /* clean off... */
0118     MUTEX_UNLOCK_V(M_statsh_lock);
0119 
0120     return ret;
0121 }
0122 
0123 /**
0124  * Increment the count by queue name
0125  * @param p_stats queue name for which to increment the counter
0126  * @return EXSUCCEED/EXFAIL
0127  */
0128 expublic void tmq_fwd_busy_inc(fwd_stats_t *p_stats)
0129 {
0130     NDRX_SPIN_LOCK_V(p_stats->busy_spin);
0131     p_stats->busy++;
0132     NDRX_SPIN_UNLOCK_V(p_stats->busy_spin);
0133 }
0134 
0135 /**
0136  * Decrement the queue counter. In case if 0, just remove the queue from the
0137  * hash.
0138  * @param p_stats stats entry to decrement
0139  */
0140 expublic void tmq_fwd_busy_dec(fwd_stats_t *p_stats)
0141 {
0142     NDRX_SPIN_LOCK_V(p_stats->busy_spin);
0143     p_stats->busy--;
0144     NDRX_SPIN_UNLOCK_V(p_stats->busy_spin);
0145     
0146 }
0147 
0148 /**
0149  * Add message to forward stat Q
0150  * @param fwd forward msg
0151  */
0152 expublic void tmq_fwd_sync_add(fwd_msg_t *fwd)
0153 {
0154     NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0155     DL_APPEND(fwd->stats->sync_head, fwd);
0156     NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0157 }
0158 
0159 /**
0160  * Delete message from sync list
0161  * @param fwd forward message
0162  */
0163 expublic void tmq_fwd_sync_del(fwd_msg_t *fwd)
0164 {
0165     NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0166     DL_DELETE(fwd->stats->sync_head, fwd);
0167     NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0168 }
0169 
0170 /**
0171  * Check is current our order for msg to process
0172  * @param fwd message to verify
0173  * @return EXTRUE (mine), EXFALSE (not mine turn)
0174  */
0175 expublic int tmq_fwd_sync_cmp(fwd_msg_t *fwd)
0176 {
0177     int ret = EXFALSE;
0178     NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0179     
0180     if (fwd->stats->sync_head == fwd)
0181     {
0182         ret = EXTRUE;
0183     }
0184     
0185     NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0186     
0187     return ret;
0188 }
0189 
0190 /**
0191  * Wait on mine message
0192  * @param fwd forward message
0193  */
0194 expublic void tmq_fwd_sync_wait(fwd_msg_t *fwd)
0195 {
0196     MUTEX_LOCK_V(fwd->stats->sync_mut);
0197     
0198     while (!tmq_fwd_sync_cmp(fwd))
0199     {
0200         pthread_cond_wait(&fwd->stats->sync_cond, &fwd->stats->sync_mut);
0201     }
0202     
0203     MUTEX_UNLOCK_V(fwd->stats->sync_mut);
0204 }
0205 
0206 /**
0207  * Remove msg, notify for wakup (once our msg is done...)
0208  * @param fwd fwd msg
0209  */
0210 expublic void tmq_fwd_sync_notify(fwd_msg_t *fwd)
0211 {
0212     
0213     MUTEX_LOCK_V(fwd->stats->sync_mut);
0214     
0215     tmq_fwd_sync_del(fwd);
0216      
0217     /* notify all... */
0218     pthread_cond_broadcast(&fwd->stats->sync_cond);
0219     
0220     MUTEX_UNLOCK_V(fwd->stats->sync_mut);
0221 }
0222 
0223 /* vim: set ts=4 sw=4 et smartindent: */