0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035 #ifndef TMQD_H
0036 #define TMQD_H
0037
0038 #ifdef __cplusplus
0039 extern "C" {
0040 #endif
0041
0042
0043 #include <sys_unix.h>
0044 #include <xa_cmn.h>
0045 #include <atmi.h>
0046 #include <exhash.h>
0047 #include <exthpool.h>
0048 #include <rbtree.h>
0049 #include "tmqueue.h"
0050
0051
0052 extern pthread_t G_forward_thread;
0053 extern int volatile G_forward_req_shutdown;
0054 extern int volatile ndrx_G_forward_req_shutdown_ack;
0055
0056 #define SCAN_TIME_DFLT 10
0057 #define MAX_TRIES_DFTL 100
0058 #define THREADPOOL_DFLT 10
0059 #define TXTOUT_DFLT 30
0060 #define SES_TOUT_DFLT 180
0061
0062 #define TMQ_MODE_FIFO 'F'
0063 #define TMQ_MODE_LIFO 'L'
0064
0065
0066 #define TMQ_AUTOQ_MANUAL 'N'
0067 #define TMQ_AUTOQ_AUTO 'Y'
0068 #define TMQ_AUTOQ_AUTOTX 'T'
0069 #define TMQ_AUTOQ_ALLFLAGS "NYT"
0070 #define TMQ_AUTOQ_ISAUTO(X) ((TMQ_AUTOQ_AUTO==X) || (TMQ_AUTOQ_AUTOTX==X))
0071
0072
0073 #define TMQ_TXSTATE_ACTIVE 0
0074 #define TMQ_TXSTATE_PREPARED 1
0075 #define TMQ_TXSTATE_COMMITTED 2
0076
0077 #define TMQ_QUEUE_SERVICE "@"
0078
0079 #define TMQ_ARGS_COMMIT "Cc"
0080
0081 #define TMQ_SYNC_NONE 0
0082 #define TMQ_SYNC_TPACALL 1
0083 #define TMQ_SYNC_TPCOMMIT 2
0084
0085 #define NDRX_TMQ_LOC_UNKNOWN 0x0000
0086 #define NDRX_TMQ_LOC_INFL 0x0001
0087 #define NDRX_TMQ_LOC_FUTQ 0x0002
0088 #define NDRX_TMQ_LOC_CURQ 0x0004
0089 #define NDRX_TMQ_LOC_CORQ 0x0008
0090 #define NDRX_TMQ_LOC_MSGIDHASH 0x0010
0091
0092
0093
0094 #define TMQ_COR_GETMSG(ptr) ((tmq_memmsg_t *)((char *)ptr - EXOFFSET(tmq_memmsg_t, cor)))
0095
0096
0097
0098
0099
0100
0101
0102 typedef struct
0103 {
0104 long dflt_timeout;
0105
0106 long ses_timeout;
0107
0108
0109 int scan_time;
0110
0111 int tout_check_time;
0112
0113 char qconfig[PATH_MAX+1];
0114 int threadpoolsize;
0115 threadpool thpool;
0116
0117 int notifpoolsize;
0118 threadpool notifthpool;
0119
0120 int fwdpoolsize;
0121 threadpool fwdthpool;
0122
0123 threadpool shutdownseq;
0124
0125 long fsync_flags;
0126
0127 int no_chkrun;
0128
0129 long vnodeid;
0130
0131 int chkdisk_time;
0132
0133 } tmqueue_cfg_t;
0134
0135
0136 typedef struct tmq_cormsg tmq_corhash_t;
0137
0138
0139 typedef struct tmq_qhash tmq_qhash_t;
0140
0141
0142 typedef struct tmq_qconfig tmq_qconfig_t;
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153 typedef struct tmq_memmsg tmq_memmsg_t;
0154 struct tmq_memmsg
0155 {
0156 ndrx_rbt_node_t cur;
0157 ndrx_rbt_node_t cor;
0158
0159 char msgid_str[TMMSGIDLEN_STR+1];
0160 char corrid_str[TMCORRIDLEN_STR+1];
0161
0162 EX_hash_handle hh;
0163
0164
0165 tmq_memmsg_t *next;
0166 tmq_memmsg_t *prev;
0167
0168
0169 tmq_corhash_t *corhash;
0170
0171 tmq_qconfig_t *qconf;
0172 tmq_qhash_t *qhash;
0173
0174 tmq_msg_t *msg;
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196
0197 short qstate;
0198
0199 };
0200
0201
0202
0203
0204 struct tmq_cormsg
0205 {
0206 char corrid_str[TMCORRIDLEN_STR+1];
0207
0208
0209
0210 ndrx_rbt_tree_t corq;
0211
0212 EX_hash_handle hh;
0213 };
0214
0215
0216
0217
0218 struct tmq_qhash
0219 {
0220 char qname[TMQNAMELEN+1];
0221 long succ;
0222 long fail;
0223
0224 long numenq;
0225 long numdeq;
0226
0227 EX_hash_handle hh;
0228
0229 ndrx_rbt_tree_t q;
0230 ndrx_rbt_tree_t q_fut;
0231
0232
0233
0234
0235
0236 tmq_memmsg_t *q_infligh;
0237
0238 tmq_corhash_t *corhash;
0239 };
0240
0241
0242
0243
0244
0245
0246 struct tmq_qconfig
0247 {
0248 char qname[TMQNAMELEN+1];
0249 char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0250 char autoq;
0251 int tries;
0252 int waitinit;
0253 int waitretry;
0254 int waitretrymax;
0255 int memonly;
0256 char mode;
0257 int txtout;
0258 char errorq[TMQNAMELEN];
0259 int workers;
0260 int sync;
0261
0262 EX_hash_handle hh;
0263 };
0264
0265
0266
0267
0268 typedef struct fwd_qlist fwd_qlist_t;
0269 struct fwd_qlist
0270 {
0271 char qname[TMQNAMELEN+1];
0272 long succ;
0273 long fail;
0274
0275 long numenq;
0276 long numdeq;
0277 int workers;
0278 int sync;
0279 fwd_qlist_t *next;
0280 fwd_qlist_t *prev;
0281 };
0282
0283
0284 typedef struct fwd_msg fwd_msg_t;
0285
0286
0287
0288
0289 typedef struct {
0290
0291 char qname[TMQNAMELEN+1];
0292 int busy;
0293 NDRX_SPIN_LOCKDECL(busy_spin);
0294
0295
0296
0297
0298
0299
0300 NDRX_SPIN_LOCKDECL(sync_spin);
0301 MUTEX_LOCKDECLN(sync_mut);
0302 pthread_cond_t sync_cond;
0303
0304 fwd_msg_t *sync_head;
0305
0306 EX_hash_handle hh;
0307
0308 } fwd_stats_t;
0309
0310
0311
0312
0313 struct fwd_msg {
0314 fwd_stats_t *stats;
0315 tmq_msg_t *msg;
0316 int sync;
0317 unsigned long seq;
0318 fwd_msg_t *prev;
0319 fwd_msg_t *next;
0320
0321 };
0322
0323
0324 extern tmqueue_cfg_t G_tmqueue_cfg;
0325 extern tmq_memmsg_t *G_msgid_hash;
0326 extern void (*G_tmq_chkdisk_th)(void *ptr, int *p_finish_off);
0327
0328
0329
0330
0331 extern void tmq_thread_init(void);
0332 extern void tmq_thread_uninit(void);
0333 extern void tmq_thread_shutdown(void *ptr, int *p_finish_off);
0334
0335
0336
0337 extern int tmq_mqlq(UBFH *p_ub, int cd);
0338 extern int tmq_mqlc(UBFH *p_ub, int cd);
0339 extern int tmq_mqlm(UBFH *p_ub, int cd);
0340
0341 extern int tmq_mqrc(UBFH *p_ub);
0342 extern int tmq_mqch(UBFH *p_ub);
0343
0344 extern int tmq_enqueue(UBFH *p_ub, int *int_diag);
0345 extern int tmq_dequeue(UBFH **pp_ub, int *int_diag);
0346
0347
0348 extern int background_read_log(void);
0349 extern void forward_shutdown_wake(void);
0350 extern int forward_process_init(void);
0351 extern void forward_lock(void);
0352 extern void forward_unlock(void);
0353 extern void thread_shutdown(void *ptr, int *p_finish_off);
0354
0355
0356 extern int tmq_reload_conf(char *cf);
0357 extern int tmq_qconf_addupd(char *qconfstr, char *name);
0358 extern int tmq_dum_add(char *tmxid);
0359 extern int tmq_msg_add(tmq_msg_t **msg, int is_recovery, TPQCTL *diag, int *int_diag);
0360 extern int tmq_unlock_msg(union tmq_upd_block *b);
0361 extern tmq_msg_t * tmq_msg_dequeue(char *qname, long flags, int is_auto, long *diagnostic,
0362 char *diagmsg, size_t diagmsgsz, char *corrid_str, int *int_diag);
0363 extern tmq_msg_t * tmq_msg_dequeue_by_msgid(char *msgid, long flags, long *diagnostic,
0364 char *diagmsg, size_t diagmsgsz, int *int_diag);
0365 extern int tmq_unlock_msg_by_msgid(char *msgid, int chkrun);
0366
0367 extern int tmq_msgid_exists(char *msgid_str);
0368
0369 extern int tmq_load_msgs(void);
0370 extern fwd_qlist_t *tmq_get_qlist(int auto_only, int incl_def);
0371 extern int tmq_qconf_get_with_default_static(char *qname, tmq_qconfig_t *qconf_out);
0372 extern tmq_qconfig_t * tmq_qconf_get_with_default(char *qname, int *p_is_defaulted);
0373 extern int tmq_build_q_def(char *qname, int *p_is_defaulted, char *out_buf, size_t out_bufsz);
0374 extern tmq_memmsg_t *tmq_get_msglist(char *qname);
0375
0376 extern int tmq_update_q_stats(char *qname, long succ_diff, long fail_diff);
0377 extern void tmq_get_q_stats(char *qname, long *p_msgs, long *p_locked);
0378 extern int q_msg_sort(tmq_memmsg_t *q1, tmq_memmsg_t *q2);
0379
0380 extern int tmq_cor_msg_add(tmq_memmsg_t *mmsg);
0381 extern void tmq_cor_msg_del(tmq_memmsg_t *mmsg);
0382 extern tmq_corhash_t * tmq_cor_find(tmq_qhash_t *qhash, char *corrid_str);
0383
0384
0385 extern void tmq_rbt_combine_cur(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0386 extern void tmq_rbt_combine_fut(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0387 extern void tmq_rbt_combine_cor(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0388 extern int tmq_rbt_cmp_cur(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0389 extern int tmq_rbt_cmp_cor(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0390 extern int tmq_rbt_cmp_fut (const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0391
0392 extern int tmq_fwd_busy_cnt(char *qname, fwd_stats_t **p_stats);
0393 extern void tmq_fwd_busy_inc(fwd_stats_t *p_stats);
0394 extern void tmq_fwd_busy_dec(fwd_stats_t *p_stats);
0395 extern int tmq_fwd_stat_init(void);
0396
0397 extern void tmq_fwd_sync_add(fwd_msg_t *fwd);
0398 extern void tmq_fwd_sync_del(fwd_msg_t *fwd);
0399 extern int tmq_fwd_sync_cmp(fwd_msg_t *fwd);
0400 extern void ndrx_forward_chkrun(tmq_memmsg_t *msg);;
0401
0402 extern void tmq_fwd_sync_wait(fwd_msg_t *fwd);
0403 extern void tmq_fwd_sync_notify(fwd_msg_t *fwd);
0404
0405
0406 extern int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg);
0407 extern int ndrx_infl_mov2infl(tmq_memmsg_t *mmsg);
0408 extern int ndrx_infl_mov2cur(tmq_memmsg_t *mmsg);
0409 extern int ndrx_infl_delmsg(tmq_memmsg_t *mmsg);
0410 extern int ndrx_infl_fut2cur(tmq_qhash_t *qhash);
0411
0412 #ifdef __cplusplus
0413 }
0414 #endif
0415
0416 #endif
0417
0418