0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <assert.h>
0039
0040 #include <ndebug.h>
0041 #include <exhash.h>
0042 #include <atmi.h>
0043
0044 #include "tmqd.h"
0045 #include <utlist.h>
0046 #include <rbtree.h>
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064 expublic int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
0065 {
0066 int ret = EXSUCCEED;
0067 int isNew = EXFALSE;
0068 char corrid_str[TMCORRIDLEN_STR+1];
0069
0070 mmsg->qconf=qconf;
0071 mmsg->qhash=qhash;
0072
0073 if (mmsg->msg->lockthreadid)
0074 {
0075 CDL_APPEND(qhash->q_infligh, mmsg);
0076 mmsg->qstate = NDRX_TMQ_LOC_INFL;
0077 }
0078 else if ( (mmsg->msg->qctl.flags & TPQTIME_ABS) &&
0079 (mmsg->msg->qctl.deq_time > (long)time(NULL)))
0080 {
0081 ndrx_rbt_insert(&qhash->q_fut, (ndrx_rbt_node_t *)mmsg, &isNew);
0082 mmsg->qstate = NDRX_TMQ_LOC_FUTQ;
0083 }
0084 else
0085 {
0086
0087 ndrx_rbt_insert(&qhash->q, (ndrx_rbt_node_t *)mmsg, &isNew);
0088 mmsg->qstate = NDRX_TMQ_LOC_CURQ;
0089
0090 if (mmsg->msg->qctl.flags & TPQCORRID)
0091 {
0092 if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0093 {
0094 NDRX_LOG(log_error, "Failed to add msg to corhash!");
0095 EXFAIL_OUT(ret);
0096 }
0097 mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0098 }
0099 }
0100
0101
0102
0103 EXHASH_ADD_STR( G_msgid_hash, msgid_str, mmsg);
0104 mmsg->qstate |= NDRX_TMQ_LOC_MSGIDHASH;
0105
0106 out:
0107 return ret;
0108 }
0109
0110
0111
0112
0113
0114 expublic int ndrx_infl_mov2infl(tmq_memmsg_t *mmsg)
0115 {
0116 int ret = EXSUCCEED;
0117
0118 if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
0119 {
0120 ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0121 mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;
0122 }
0123 else if (mmsg->qstate & NDRX_TMQ_LOC_CURQ)
0124 {
0125 ndrx_rbt_delete(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg);
0126 mmsg->qstate &= ~NDRX_TMQ_LOC_CURQ;
0127
0128
0129 if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
0130 {
0131 tmq_cor_msg_del(mmsg);
0132 mmsg->qstate &= ~NDRX_TMQ_LOC_CORQ;
0133 }
0134 }
0135 else
0136 {
0137 NDRX_LOG(log_error, "Message [%s] is not in cur/fut queue!",
0138 mmsg->msgid_str);
0139 userlog("Cannot move to inflight Q: messsage [%s] is not in cur/fut queue (state=%hd)!",
0140 mmsg->msgid_str, mmsg->qstate);
0141
0142 abort();
0143 }
0144
0145
0146 CDL_APPEND(mmsg->qhash->q_infligh, mmsg);
0147 mmsg->qstate |= NDRX_TMQ_LOC_INFL;
0148
0149 out:
0150 return ret;
0151 }
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161 expublic int ndrx_infl_mov2cur(tmq_memmsg_t *mmsg)
0162 {
0163 int ret = EXSUCCEED;
0164
0165 if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
0166 {
0167 CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
0168 mmsg->qstate &= ~NDRX_TMQ_LOC_INFL;
0169
0170
0171 if ( (mmsg->msg->qctl.flags & TPQTIME_ABS) &&
0172 (mmsg->msg->qctl.deq_time > (long)time(NULL)))
0173 {
0174 ndrx_rbt_insert(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg, NULL);
0175 mmsg->qstate |= NDRX_TMQ_LOC_FUTQ;
0176 }
0177 else
0178 {
0179
0180 ndrx_rbt_insert(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
0181 mmsg->qstate |= NDRX_TMQ_LOC_CURQ;
0182
0183 if (mmsg->msg->qctl.flags & TPQCORRID)
0184 {
0185 if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0186 {
0187 NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
0188 mmsg->msgid_str);
0189 EXFAIL_OUT(ret);
0190 }
0191 mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0192 }
0193 }
0194 }
0195 else
0196 {
0197 NDRX_LOG(log_error, "Message [%s] is not in inflight queue!",
0198 mmsg->msgid_str);
0199 userlog("Cannot move to cur Q: messsage [%s] is not in inflight queue (state=%hd)!",
0200 mmsg->msgid_str, mmsg->qstate);
0201 abort();
0202 }
0203
0204 out:
0205 return ret;
0206 }
0207
0208
0209
0210
0211
0212
0213
0214
0215 expublic int ndrx_infl_delmsg(tmq_memmsg_t *mmsg)
0216 {
0217 int ret = EXSUCCEED;
0218
0219 if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
0220 {
0221
0222 tmq_cor_msg_del(mmsg);
0223 }
0224
0225 if (mmsg->qstate & NDRX_TMQ_LOC_MSGIDHASH)
0226 {
0227 EXHASH_DEL(G_msgid_hash, mmsg);
0228 }
0229
0230 if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
0231 {
0232 CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
0233 }
0234
0235 if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
0236 {
0237 ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0238 }
0239
0240 if (mmsg->qstate & NDRX_TMQ_LOC_CURQ)
0241 {
0242
0243 ndrx_rbt_delete(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg);
0244 }
0245
0246 mmsg->qstate=0;
0247
0248 return ret;
0249 }
0250
0251
0252
0253
0254
0255 expublic int ndrx_infl_fut2cur(tmq_qhash_t *qhash)
0256 {
0257 int ret = EXSUCCEED;
0258 tmq_memmsg_t *mmsg = NULL;
0259 ndrx_rbt_tree_iterator_t iter;
0260
0261 while (NULL != (mmsg = (tmq_memmsg_t*)ndrx_rbt_leftmost(&qhash->q_fut)) )
0262 {
0263
0264 if ( mmsg->msg->qctl.deq_time <= (long)time(NULL) )
0265 {
0266
0267 ndrx_rbt_delete(&mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
0268 mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;
0269
0270
0271 ndrx_rbt_insert(&mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
0272 mmsg->qstate |= NDRX_TMQ_LOC_CURQ;
0273
0274
0275 if (mmsg->msg->qctl.flags & TPQCORRID)
0276 {
0277
0278 if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
0279 {
0280 NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
0281 mmsg->msgid_str);
0282 EXFAIL_OUT(ret);
0283 }
0284 mmsg->qstate |= NDRX_TMQ_LOC_CORQ;
0285 }
0286 }
0287 else
0288 {
0289 break;
0290 }
0291 }
0292
0293 out:
0294 return ret;
0295 }
0296
0297