Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Transaction Managed Queue - daemon header
0003  *
0004  * @file tmqd.h
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 #ifndef TMQD_H
0036 #define TMQD_H
0037 
0038 #ifdef  __cplusplus
0039 extern "C" {
0040 #endif
0041 
0042 /*---------------------------Includes-----------------------------------*/
0043 #include <sys_unix.h>
0044 #include <xa_cmn.h>
0045 #include <atmi.h>
0046 #include <exhash.h>
0047 #include <exthpool.h>
0048 #include <rbtree.h>
0049 #include "tmqueue.h"
0050     
0051 /*---------------------------Externs------------------------------------*/
0052 extern pthread_t G_forward_thread;
0053 extern int volatile G_forward_req_shutdown;          /**< Is shutdown request? */
0054 extern int volatile ndrx_G_forward_req_shutdown_ack; /**< Is shutdown acked?   */
0055 /*---------------------------Macros-------------------------------------*/
0056 #define SCAN_TIME_DFLT          10  /**< Every 10 sec try to complete TXs */
0057 #define MAX_TRIES_DFTL          100 /**< Try count for transaction completion */
0058 #define THREADPOOL_DFLT         10  /**< Default number of threads spawned   */
0059 #define TXTOUT_DFLT             30  /**< Default XA transaction timeout      */
0060 #define SES_TOUT_DFLT           180  /**< Default XA transaction timeout      */
0061 
0062 #define TMQ_MODE_FIFO           'F' /**< fifo q mode                        */
0063 #define TMQ_MODE_LIFO           'L' /**< lifo q mode                        */
0064 
0065 /* Autoq flags: */
0066 #define TMQ_AUTOQ_MANUAL        'N' /**< Not automatic Q                    */
0067 #define TMQ_AUTOQ_AUTO          'Y' /**< Automatic                          */
0068 #define TMQ_AUTOQ_AUTOTX        'T' /**< Automatic, transactional           */
0069 #define TMQ_AUTOQ_ALLFLAGS      "NYT" /**< list of all flasg                */
0070 #define TMQ_AUTOQ_ISAUTO(X) ((TMQ_AUTOQ_AUTO==X) || (TMQ_AUTOQ_AUTOTX==X))
0071 
0072 
0073 #define TMQ_TXSTATE_ACTIVE      0   /**< Active message                     */
0074 #define TMQ_TXSTATE_PREPARED    1   /**< Prepared msg                       */
0075 #define TMQ_TXSTATE_COMMITTED   2   /**< Committed msg                      */
0076 
0077 #define TMQ_QUEUE_SERVICE       "@" /**< Special name when service matches queue name */
0078 
0079 #define TMQ_ARGS_COMMIT         "Cc"    /**< Sync after commit              */
0080 
0081 #define TMQ_SYNC_NONE           0       /**< NO sync needed                 */
0082 #define TMQ_SYNC_TPACALL        1       /**< Sync on tpacall                */
0083 #define TMQ_SYNC_TPCOMMIT       2       /**< Sync on tpcommit () if auto=T  */
0084 
0085 #define NDRX_TMQ_LOC_UNKNOWN    0x0000  /**< Unknown location               */
0086 #define NDRX_TMQ_LOC_INFL       0x0001  /**< Inflight queue                 */
0087 #define NDRX_TMQ_LOC_FUTQ       0x0002  /**< Future queue                   */
0088 #define NDRX_TMQ_LOC_CURQ       0x0004  /**< Current queue                  */
0089 #define NDRX_TMQ_LOC_CORQ       0x0008  /**< Correlator queue               */
0090 #define NDRX_TMQ_LOC_MSGIDHASH  0x0010  /**< Message id hash                */
0091 /**
0092  * Extract tmq_memmsg_t from the correltion tree node 
0093  */
0094 #define TMQ_COR_GETMSG(ptr) ((tmq_memmsg_t *)((char *)ptr - EXOFFSET(tmq_memmsg_t, cor)))
0095 
0096 /*---------------------------Enums--------------------------------------*/
0097 /*---------------------------Typedefs-----------------------------------*/
0098 
0099 /**
0100  * TM ndrx_config.handler
0101  */
0102 typedef struct
0103 {
0104     long dflt_timeout;  /**< service call transaction timeout (forwarder)    */
0105     
0106     long ses_timeout;  /**< global session timeout, 
0107                         * how long uncompleted transaction may hang          */
0108     
0109     int scan_time;      /**< Number of seconds retries                       */
0110     
0111     int tout_check_time; /**< seconds used for detecting transaction timeout */
0112     
0113     char qconfig[PATH_MAX+1]; /**< Queue config file                        */
0114     int threadpoolsize;       /**< thread pool size                         */
0115     threadpool thpool;        /**< threads for service                      */
0116     
0117     int notifpoolsize;        /**< Notify thread pool size                  */
0118     threadpool notifthpool;   /**< Notify (loop back) threads for service (callbacks from TMSRV) */
0119     
0120     int fwdpoolsize;          /**< forwarder thread pool size               */
0121     threadpool fwdthpool;     /**< threads for forwarder                    */
0122     
0123     threadpool shutdownseq;   /**< Shutdown sequencer, we have simpler just to use pool instead of threads  */
0124     
0125     long fsync_flags;         /**< special flags for disk sync              */
0126     
0127     int no_chkrun;           /**< If set to true, do not trigger queue run  */
0128 
0129     long vnodeid;            /**< Node id, command id used for failovers    */
0130 
0131     int chkdisk_time;        /**< Time by which disk checking shall be excuted (enabled if > 0) */
0132     
0133 } tmqueue_cfg_t;
0134 
0135 /** correlator message queue hash */
0136 typedef struct tmq_cormsg tmq_corhash_t;
0137 
0138 /** hash of queues */
0139 typedef struct tmq_qhash tmq_qhash_t;
0140 
0141 /** queue configuration */
0142 typedef struct tmq_qconfig tmq_qconfig_t;
0143 
0144 /**
0145  * Memory based message.
0146  * Includes all the links to keep track of the message in the queues.
0147  * such as:
0148  *  - red-black tree of current/future run messages
0149  *  - linked list of in-flight messages (message locked, not available for dequeue)
0150  *  - red-black tree of correlator based messages
0151  * Structure is built every time message is enqueued or read from disk.
0152  */
0153 typedef struct tmq_memmsg tmq_memmsg_t;
0154 struct tmq_memmsg
0155 {
0156     ndrx_rbt_node_t cur;    /**< handle in future or now list */
0157     ndrx_rbt_node_t cor;    /**< handle in correlator list    */
0158 
0159     char msgid_str[TMMSGIDLEN_STR+1]; /**< we might store msgid in string format... */
0160     char corrid_str[TMCORRIDLEN_STR+1]; /**< hash for correlator              */
0161     /** We should have hash handler of message hash                           */
0162     EX_hash_handle hh; /**< makes this structure hashable (for msgid)         */
0163 
0164     /** handlers for in-flight q (used for current message listing)           */
0165     tmq_memmsg_t *next;
0166     tmq_memmsg_t *prev;
0167 
0168     /** backlink to correlator q, so that we know where to remove             */
0169     tmq_corhash_t *corhash;
0170 
0171     tmq_qconfig_t *qconf;   /**< configuration used for this message Q        */
0172     tmq_qhash_t *qhash;     /**< queue entry                                  */
0173 
0174     tmq_msg_t *msg;         /**< disk message                                 */
0175     
0176 /**
0177  * NDRX_TMQ_LOC_UNK  0x0000
0178  * NDRX_TMQ_LOC_INFL 0x0001
0179  * NDRX_TMQ_LOC_FUT  0x0002
0180  * NDRX_TMQ_LOC_CUR  0x0004
0181  * NDRX_TMQ_LOC_COR  0x0008
0182  * 
0183  * flags|=NDRX_TMQ_LOC_INFL;
0184  * 
0185  * flags&=~NDRX_TMQ_LOC_CUR;
0186  * flags&=~NDRX_TMQ_LOC_COR;
0187  * flags&=~NDRX_TMQ_LOC_FUT;
0188  * 
0189  * 
0190  * delete:
0191  *  if flags & NDRX_TMQ_LOC_INFL=> delete from NDRX_TMQ_LOC_INFL
0192  *  if flags & NDRX_TMQ_LOC_FUT=> delete from fut;
0193  * 
0194  * 
0195  */
0196     /* where are we? */
0197     short qstate;
0198 
0199 };
0200 
0201 /**
0202  * Messages correlated
0203  */
0204 struct tmq_cormsg
0205 {
0206     char corrid_str[TMCORRIDLEN_STR+1]; /**< hash for correlator               */
0207     /** queue by correlation, CDL, next2, prev2 
0208     tmq_memmsg_t *corq; */
0209 
0210     ndrx_rbt_tree_t corq; /**< queue uses standard sorting (insert time)      */
0211 
0212     EX_hash_handle hh; /**< makes this structure hashable        */
0213 };
0214 
0215 /**
0216  * List of queues (for queued messages)
0217  */
0218 struct tmq_qhash
0219 {
0220     char qname[TMQNAMELEN+1];
0221     long succ;      /**< Succeeded auto messages                 */
0222     long fail;      /**< failed auto messages                    */
0223     
0224     long numenq;    /**< Enqueued messages (even locked)         */
0225     long numdeq;    /**< Dequeued messages (removed, including aborts)     */
0226 
0227     EX_hash_handle hh; /**< makes this structure hashable        */
0228 
0229     ndrx_rbt_tree_t q;     /**< Currently available messages        */
0230     ndrx_rbt_tree_t q_fut; /**< future messages (not yet ready for proc) */
0231 
0232     /** 
0233      * in-flight messages (in process) 
0234      * this is linked list head
0235      */
0236     tmq_memmsg_t *q_infligh;
0237 
0238     tmq_corhash_t *corhash; /**< hash of correlators                */
0239 };
0240 
0241 /**
0242  * Queue configuration.
0243  * There will be special Q: "@DEFAULT" which contains the settings for default
0244  * (unknown queue)
0245  */
0246 struct tmq_qconfig
0247 {
0248     char qname[TMQNAMELEN+1];
0249     char svcnm[XATMI_SERVICE_NAME_LENGTH+1]; /**< optional service name to call */
0250     char autoq;      /**< Is this automatic queue                       */
0251     int tries;       /**< Retry count for sending                       */
0252     int waitinit;    /**< How long to wait for initial sending (sec)    */
0253     int waitretry;   /**< How long to wait between retries (sec)        */
0254     int waitretrymax;/**< Max wait  (sec)                               */
0255     int memonly;    /**< is queue memory only                           */
0256     char mode;      /**< queue mode fifo/lifo                           */
0257     int txtout;     /**< transaction timeout (override if > -1)         */
0258     char errorq[TMQNAMELEN];     /**< Error queue name, optional        */
0259     int workers;   /**< Max number of busy forward workers              */
0260     int sync;      /**< Sync forward sending                            */
0261     
0262     EX_hash_handle hh; /**< makes this structure hashable               */
0263 };
0264 
0265 /**
0266  * List of Qs to forward
0267  */
0268 typedef struct fwd_qlist fwd_qlist_t;
0269 struct fwd_qlist
0270 {
0271     char qname[TMQNAMELEN+1];
0272     long succ; /**< Succeeded auto messages                 */
0273     long fail; /**< failed auto messages                    */
0274     
0275     long numenq; /**< Succeeded auto messages               */
0276     long numdeq; /**< failed auto messages                  */
0277     int workers;    /**< number of configured workers       */
0278     int sync;       /**< is queue synchronized?             */
0279     fwd_qlist_t *next;
0280     fwd_qlist_t *prev;
0281 };
0282 
0283 
0284 typedef struct fwd_msg fwd_msg_t;
0285 
0286 /**
0287  * Forward statistics
0288  */
0289 typedef struct {
0290     
0291     char qname[TMQNAMELEN+1];
0292     int busy;
0293     NDRX_SPIN_LOCKDECL(busy_spin); /**< add/cmp/del ops         */
0294     
0295     /*
0296      * - have have spinlock for adding/checking/removing msg from list.
0297      * - have a mutex for workers to sleep & wait for signal/broadcast
0298      * - have a cond variable for sleeping on
0299      */
0300     NDRX_SPIN_LOCKDECL(sync_spin); /**< add/cmp/del ops         */
0301     MUTEX_LOCKDECLN(sync_mut);  /**< wait mut                   */
0302     pthread_cond_t   sync_cond; /**< wait cond                  */
0303             
0304     fwd_msg_t *sync_head;       /**< head msg if used for sync  */
0305     
0306     EX_hash_handle hh; /**< makes this structure hashable       */
0307     
0308 } fwd_stats_t;
0309 
0310 /**
0311  * Forward message entry
0312  */
0313 struct fwd_msg {
0314     fwd_stats_t *stats; /**< ptr to stats block of the queue    */
0315     tmq_msg_t   *msg;   /**< message entry to forward           */
0316     int     sync;       /**< do we run in sync mode?            */
0317     unsigned long seq;  /**< sequence number                    */
0318     fwd_msg_t *prev;
0319     fwd_msg_t *next;
0320     
0321 };
0322 
0323 /*---------------------------Globals------------------------------------*/
0324 extern tmqueue_cfg_t G_tmqueue_cfg;
0325 extern tmq_memmsg_t *G_msgid_hash;
0326 extern void (*G_tmq_chkdisk_th)(void *ptr, int *p_finish_off);
0327 /*---------------------------Statics------------------------------------*/
0328 /*---------------------------Prototypes---------------------------------*/
0329 
0330 /* init */
0331 extern void tmq_thread_init(void);
0332 extern void tmq_thread_uninit(void);
0333 extern void tmq_thread_shutdown(void *ptr, int *p_finish_off);
0334 
0335 
0336 /* Q api */
0337 extern int tmq_mqlq(UBFH *p_ub, int cd);
0338 extern int tmq_mqlc(UBFH *p_ub, int cd);
0339 extern int tmq_mqlm(UBFH *p_ub, int cd);
0340 
0341 extern int tmq_mqrc(UBFH *p_ub);
0342 extern int tmq_mqch(UBFH *p_ub);
0343 
0344 extern int tmq_enqueue(UBFH *p_ub, int *int_diag);
0345 extern int tmq_dequeue(UBFH **pp_ub, int *int_diag);
0346 
0347 /* Background API */
0348 extern int background_read_log(void);
0349 extern void forward_shutdown_wake(void);
0350 extern int forward_process_init(void);
0351 extern void forward_lock(void);
0352 extern void forward_unlock(void);
0353 extern void thread_shutdown(void *ptr, int *p_finish_off);
0354 
0355 /* Q space api: */
0356 extern int tmq_reload_conf(char *cf);
0357 extern int tmq_qconf_addupd(char *qconfstr, char *name);
0358 extern int tmq_dum_add(char *tmxid);
0359 extern int tmq_msg_add(tmq_msg_t **msg, int is_recovery, TPQCTL *diag, int *int_diag);
0360 extern int tmq_unlock_msg(union tmq_upd_block *b);
0361 extern tmq_msg_t * tmq_msg_dequeue(char *qname, long flags, int is_auto, long *diagnostic, 
0362             char *diagmsg, size_t diagmsgsz, char *corrid_str, int *int_diag);
0363 extern tmq_msg_t * tmq_msg_dequeue_by_msgid(char *msgid, long flags, long *diagnostic, 
0364         char *diagmsg, size_t diagmsgsz, int *int_diag);
0365 extern int tmq_unlock_msg_by_msgid(char *msgid, int chkrun);
0366 
0367 extern int tmq_msgid_exists(char *msgid_str);
0368 
0369 extern int tmq_load_msgs(void);
0370 extern fwd_qlist_t *tmq_get_qlist(int auto_only, int incl_def);
0371 extern int tmq_qconf_get_with_default_static(char *qname, tmq_qconfig_t *qconf_out);
0372 extern tmq_qconfig_t * tmq_qconf_get_with_default(char *qname, int *p_is_defaulted);
0373 extern int tmq_build_q_def(char *qname, int *p_is_defaulted, char *out_buf, size_t out_bufsz);
0374 extern tmq_memmsg_t *tmq_get_msglist(char *qname);
0375     
0376 extern int tmq_update_q_stats(char *qname, long succ_diff, long fail_diff);
0377 extern void tmq_get_q_stats(char *qname, long *p_msgs, long *p_locked);
0378 extern int q_msg_sort(tmq_memmsg_t *q1, tmq_memmsg_t *q2);
0379 
0380 extern int tmq_cor_msg_add(tmq_memmsg_t *mmsg);
0381 extern void tmq_cor_msg_del(tmq_memmsg_t *mmsg);
0382 extern tmq_corhash_t * tmq_cor_find(tmq_qhash_t *qhash, char *corrid_str);
0383 
0384 /* Red-black tree support: */
0385 extern void tmq_rbt_combine_cur(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0386 extern void tmq_rbt_combine_fut(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0387 extern void tmq_rbt_combine_cor(ndrx_rbt_node_t *existing, const ndrx_rbt_node_t *newdata, void *arg);
0388 extern int tmq_rbt_cmp_cur(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0389 extern int tmq_rbt_cmp_cor(const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0390 extern int tmq_rbt_cmp_fut (const ndrx_rbt_node_t *a, const ndrx_rbt_node_t *b, void *arg);
0391 
0392 extern int tmq_fwd_busy_cnt(char *qname, fwd_stats_t **p_stats);
0393 extern void tmq_fwd_busy_inc(fwd_stats_t *p_stats);
0394 extern void tmq_fwd_busy_dec(fwd_stats_t *p_stats);
0395 extern int tmq_fwd_stat_init(void);
0396 
0397 extern void tmq_fwd_sync_add(fwd_msg_t *fwd);
0398 extern void tmq_fwd_sync_del(fwd_msg_t *fwd);
0399 extern int tmq_fwd_sync_cmp(fwd_msg_t *fwd);
0400 extern void ndrx_forward_chkrun(tmq_memmsg_t *msg);;
0401 
0402 extern void tmq_fwd_sync_wait(fwd_msg_t *fwd);
0403 extern void tmq_fwd_sync_notify(fwd_msg_t *fwd);
0404 
0405 /* inflight routines */
0406 extern int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg);
0407 extern int ndrx_infl_mov2infl(tmq_memmsg_t *mmsg);
0408 extern int ndrx_infl_mov2cur(tmq_memmsg_t *mmsg);
0409 extern int ndrx_infl_delmsg(tmq_memmsg_t *mmsg);
0410 extern int ndrx_infl_fut2cur(tmq_qhash_t *qhash);
0411 
0412 #ifdef  __cplusplus
0413 }
0414 #endif
0415 
0416 #endif  /* TMQD_H */
0417 
0418 /* vim: set ts=4 sw=4 et smartindent: */