0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <assert.h>
0039
0040 #include <ndebug.h>
0041 #include <exhash.h>
0042 #include <atmi.h>
0043
0044 #include "tmqd.h"
0045 #include <utlist.h>
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056 exprivate MUTEX_LOCKDECL(M_statsh_lock);
0057
0058
0059 exprivate fwd_stats_t *M_statsh = NULL;
0060
0061
0062
0063
0064
0065
0066 expublic int tmq_fwd_stat_init(void)
0067 {
0068 return EXSUCCEED;
0069 }
0070
0071
0072
0073
0074
0075
0076
0077
0078 expublic int tmq_fwd_busy_cnt(char *qname, fwd_stats_t **p_stats)
0079 {
0080 int ret;
0081 fwd_stats_t *el = NULL;
0082
0083
0084
0085
0086 MUTEX_LOCK_V(M_statsh_lock);
0087
0088 EXHASH_FIND_STR( M_statsh, qname, el);
0089
0090 if (NULL==el)
0091 {
0092 el = NDRX_FPMALLOC(sizeof(fwd_stats_t), 0);
0093
0094 if (NULL==el)
0095 {
0096 NDRX_LOG(log_error, "Failed to malloc %d bytes", sizeof(fwd_stats_t));
0097 EXFAIL_OUT(ret);
0098 }
0099
0100 NDRX_STRCPY_SAFE(el->qname, qname);
0101 el->busy=0;
0102 el->sync_head = NULL;
0103
0104 pthread_cond_init(&el->sync_cond, NULL);
0105 NDRX_SPIN_INIT_V(el->busy_spin);
0106 NDRX_SPIN_INIT_V(el->sync_spin);
0107 MUTEX_VAR_INIT(el->sync_mut);
0108
0109 EXHASH_ADD_STR(M_statsh, qname, el);
0110 }
0111
0112 ret=el->busy;
0113 *p_stats = el;
0114
0115 out:
0116
0117
0118 MUTEX_UNLOCK_V(M_statsh_lock);
0119
0120 return ret;
0121 }
0122
0123
0124
0125
0126
0127
0128 expublic void tmq_fwd_busy_inc(fwd_stats_t *p_stats)
0129 {
0130 NDRX_SPIN_LOCK_V(p_stats->busy_spin);
0131 p_stats->busy++;
0132 NDRX_SPIN_UNLOCK_V(p_stats->busy_spin);
0133 }
0134
0135
0136
0137
0138
0139
0140 expublic void tmq_fwd_busy_dec(fwd_stats_t *p_stats)
0141 {
0142 NDRX_SPIN_LOCK_V(p_stats->busy_spin);
0143 p_stats->busy--;
0144 NDRX_SPIN_UNLOCK_V(p_stats->busy_spin);
0145
0146 }
0147
0148
0149
0150
0151
0152 expublic void tmq_fwd_sync_add(fwd_msg_t *fwd)
0153 {
0154 NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0155 DL_APPEND(fwd->stats->sync_head, fwd);
0156 NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0157 }
0158
0159
0160
0161
0162
0163 expublic void tmq_fwd_sync_del(fwd_msg_t *fwd)
0164 {
0165 NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0166 DL_DELETE(fwd->stats->sync_head, fwd);
0167 NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0168 }
0169
0170
0171
0172
0173
0174
0175 expublic int tmq_fwd_sync_cmp(fwd_msg_t *fwd)
0176 {
0177 int ret = EXFALSE;
0178 NDRX_SPIN_LOCK_V(fwd->stats->sync_spin);
0179
0180 if (fwd->stats->sync_head == fwd)
0181 {
0182 ret = EXTRUE;
0183 }
0184
0185 NDRX_SPIN_UNLOCK_V(fwd->stats->sync_spin);
0186
0187 return ret;
0188 }
0189
0190
0191
0192
0193
0194 expublic void tmq_fwd_sync_wait(fwd_msg_t *fwd)
0195 {
0196 MUTEX_LOCK_V(fwd->stats->sync_mut);
0197
0198 while (!tmq_fwd_sync_cmp(fwd))
0199 {
0200 pthread_cond_wait(&fwd->stats->sync_cond, &fwd->stats->sync_mut);
0201 }
0202
0203 MUTEX_UNLOCK_V(fwd->stats->sync_mut);
0204 }
0205
0206
0207
0208
0209
0210 expublic void tmq_fwd_sync_notify(fwd_msg_t *fwd)
0211 {
0212
0213 MUTEX_LOCK_V(fwd->stats->sync_mut);
0214
0215 tmq_fwd_sync_del(fwd);
0216
0217
0218 pthread_cond_broadcast(&fwd->stats->sync_cond);
0219
0220 MUTEX_UNLOCK_V(fwd->stats->sync_mut);
0221 }
0222
0223