0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <assert.h>
0041
0042 #include <ndebug.h>
0043 #include <exhash.h>
0044 #include <atmi.h>
0045
0046 #include "tmqd.h"
0047 #include <utlist.h>
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062 expublic void tmq_rbt_combine_cur(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0063 {
0064 tmq_memmsg_t *existing_msg = (tmq_memmsg_t *)existing;
0065 tmq_memmsg_t *newdata_msg = (tmq_memmsg_t *)newdata;
0066 char tmp[256];
0067
0068
0069 snprintf(tmp, sizeof(tmp), "ERROR ! (cur) Two messages with the same key in current Q! "
0070 "Existing msgid [%s] new msgid [%s] dup key: msgtstamp=%ld, "
0071 "msgtstamp_usec=%ld, msgtstamp_cntr=%d. Clock skew?",
0072 existing_msg->msgid_str, newdata_msg->msgid_str, existing_msg->msg->msgtstamp,
0073 existing_msg->msg->msgtstamp_usec,
0074 existing_msg->msg->msgtstamp_cntr);
0075 NDRX_LOG(log_error, "%s", tmp);
0076 userlog("%s", tmp);
0077 abort();
0078 }
0079
0080
0081
0082
0083
0084 expublic void tmq_rbt_combine_fut(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0085 {
0086 tmq_memmsg_t *existing_msg = (tmq_memmsg_t *)existing;
0087 tmq_memmsg_t *newdata_msg = (tmq_memmsg_t *)newdata;
0088 char tmp[256];
0089 snprintf(tmp, sizeof(tmp), "ERROR ! (fut) Two messages with the same key in future Q! "
0090 "Existing msgid [%s] new msgid [%s] dup key: deq_time=%ld, msgtstamp=%ld, "
0091 "msgtstamp_usec=%ld, msgtstamp_cntr=%d. Clock skew?",
0092 existing_msg->msgid_str, newdata_msg->msgid_str,
0093 existing_msg->msg->qctl.deq_time,
0094 existing_msg->msg->msgtstamp,
0095 existing_msg->msg->msgtstamp_usec,
0096 existing_msg->msg->msgtstamp_cntr);
0097 NDRX_LOG(log_error, "%s", tmp);
0098 userlog("%s", tmp);
0099 abort();
0100 }
0101
0102
0103
0104
0105 expublic void tmq_rbt_combine_cor(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg)
0106 {
0107 tmq_memmsg_t *existing_msg = TMQ_COR_GETMSG(existing);
0108 tmq_memmsg_t *newdata_msg = TMQ_COR_GETMSG(newdata);
0109 char tmp[256];
0110 snprintf(tmp, sizeof(tmp), "ERROR ! (cor) Two messages with the same key in correlation Q! "
0111 "Existing msgid [%s] new msgid [%s] dup key: msgtstamp=%ld, msgtstamp_usec=%ld, "
0112 "msgtstamp_cntr=%d. Clock skew?",
0113 existing_msg->msgid_str, newdata_msg->msgid_str, existing_msg->msg->msgtstamp,
0114 existing_msg->msg->msgtstamp_usec,
0115 existing_msg->msg->msgtstamp_cntr);
0116 NDRX_LOG(log_error, "%s", tmp);
0117 userlog("%s", tmp);
0118 abort();
0119 }
0120
0121
0122
0123
0124 expublic int tmq_rbt_cmp_cur(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0125 {
0126 tmq_memmsg_t *aa = (tmq_memmsg_t *)a;
0127 tmq_memmsg_t *bb = (tmq_memmsg_t *)b;
0128
0129 return ndrx_compare3(aa->msg->msgtstamp, aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0130 bb->msg->msgtstamp, bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0131 }
0132
0133
0134
0135
0136 expublic int tmq_rbt_cmp_cor(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0137 {
0138 tmq_memmsg_t *aa = TMQ_COR_GETMSG(a);
0139 tmq_memmsg_t *bb = TMQ_COR_GETMSG(b);
0140
0141 return ndrx_compare3(aa->msg->msgtstamp, aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0142 bb->msg->msgtstamp, bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0143 }
0144
0145
0146
0147
0148
0149
0150 expublic int tmq_rbt_cmp_fut (const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg)
0151 {
0152
0153 tmq_memmsg_t *aa = (tmq_memmsg_t *)a;
0154 tmq_memmsg_t *bb = (tmq_memmsg_t *)b;
0155
0156 return ndrx_compare4(aa->msg->qctl.deq_time, aa->msg->msgtstamp,
0157 aa->msg->msgtstamp_usec, aa->msg->msgtstamp_cntr,
0158 bb->msg->qctl.deq_time, bb->msg->msgtstamp,
0159 bb->msg->msgtstamp_usec, bb->msg->msgtstamp_cntr);
0160 }
0161
0162
0163