Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Q for EnduroX, shared header between XA driver and TMQ server
0003  *
0004  * @file tmqueue.h
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 #ifndef TMQUEUE_H
0036 #define TMQUEUE_H
0037 
0038 #ifdef  __cplusplus
0039 extern "C" {
0040 #endif
0041 
0042 /*---------------------------Includes-----------------------------------*/
0043 #include <xa_cmn.h>
0044 #include <atmi.h>
0045 #include <exhash.h>
0046 #include <exthpool.h>
0047     
0048 /*---------------------------Externs------------------------------------*/
0049 /*---------------------------Macros-------------------------------------*/
0050 
0051 /* Basically we have a two forms of MSGID
0052  * 1. Native, 32 byte binary byte array
0053  * 2. String, Base64, 
0054  */
0055 #define TMQ_DEFAULT_Q           "@"         /**< Symbol for default Q       */
0056 
0057 #define TMQ_MAGIC               "ETQ3"      /**< magic of tmq record        */
0058 #define TMQ_MAGIC2              "END3"      /**< magic of tmq record, end   */
0059 #define TMQ_MAGIC_LEN           4           /**< the len of message magic   */
0060 
0061 #define TMQ_MAGICBASE           "ETQ"       /**< magic without version      */
0062 #define TMQ_MAGICBASE2          "END"       /**< magic without version, end */
0063 #define TMQ_MAGICBASE_LEN       3           /**< magic without version      */
0064     
0065 #define TMQ_STORCMD_NEWMSG          'N'     /**< Command code - new message */
0066 #define TMQ_STORCMD_UPD             'U'     /**< Command code - update msg  */
0067 #define TMQ_STORCMD_DEL             'D'     /**< Command code - delete msg  */
0068 #define TMQ_STORCMD_UNLOCK          'L'     /**< Command code - unlock msg  */
0069 #define TMQ_STORCMD_DUM             'M'     /**< Command code - dummy msg
0070                                             for transaction identification  */
0071     
0072 
0073 /**
0074  * Status codes of the message
0075  */
0076 #define TMQ_STATUS_ACTIVE       'A'         /**< Message is active          */
0077 #define TMQ_STATUS_DONE         'D'         /**< Message is done            */
0078 #define TMQ_STATUS_EXPIRED      'E'         /**< Message is expired  or tries exceeded  */
0079 #define TMQ_STATUS_SUSPENDED    'S'         /**< Message is suspended       */
0080     
0081     
0082 #define TMQ_SYS_ASYNC_CPLT    0x00000001    /**< Complete message in async mode */
0083     
0084 /**
0085  * List of tmq specific internal errors
0086  * @defgroup tmq_errors
0087  * @{
0088  */
0089 
0090 #define TMQ_ERR_VERSION          1          /**< Version error              */
0091 #define TMQ_ERR_EOF              2          /**< File is truncated          */
0092 #define TMQ_ERR_CORRUPT          3          /**< File contents are corrupted*/
0093     
0094 /** @} */ /* end of tmq_errors */
0095     
0096     
0097 #define TMQ_HOUSEKEEP_DEFAULT   (90*60)     /**< houskeep 1 hour 30 min     */
0098 
0099 #define TMQ_INT_DIAG_EJOIN      0x00000001  /**< Got join transaction join error */
0100 
0101 #define TMQ_FSCACHE_LEN         sizeof(tmq_msg_t) /**< part to cache, for faster counter updates */
0102 /*---------------------------Enums--------------------------------------*/
0103 /*---------------------------Typedefs-----------------------------------*/
0104 
0105 /**
0106  * Common command header
0107  */
0108 typedef struct
0109 {
0110     char magic[4];          /**< File magic   1                             */
0111     short srvid;
0112     short nodeid;
0113     /* TODO: Consider adding rmid, if running different queue spaces in 
0114      * the same folder
0115      */
0116     char qname[TMQNAMELEN+1];
0117     char qspace[XATMI_SERVICE_NAME_LENGTH+1];
0118     char command_code;      /**< command code, see TMQ_CMD*                 */
0119     char msgid[TMMSGIDLEN]; /**< message_id                                 */
0120     long flags;             /**< Copy of message flags                      */
0121     char reserved[64];      /**< Reversed space for future upgrades         */
0122     char magic2[4];         /**< File magic                                 */
0123 } tmq_cmdheader_t;
0124 
0125 /** 
0126  * Command: qmessage 
0127  */
0128 typedef struct
0129 {
0130     /** Lets have first 512 bytes of dynamic infos:
0131      * so that update fits in one sector update, if in future
0132      * we perform optimizations:
0133      */
0134     tmq_cmdheader_t hdr;
0135     uint64_t lockthreadid;  /**< Locked thread id                           */
0136     char status;            /**< Status of the message                      */
0137     long trycounter;        /**< try counter                                */
0138     long msgtstamp;         /**< epoch up to second                         */
0139     long msgtstamp_usec;    /**< 1/10^6 sec                                 */
0140     int msgtstamp_cntr;     /**< Message counter for same time interval     */
0141     long trytstamp;         /**< epoch up to second                         */
0142     long trytstamp_usec;    /**< 1/10^6 sec                                 */
0143     
0144     TPQCTL qctl;            /**< Queued message                             */
0145     /* Message log (stored only in file)                                    */
0146     long len;               /**< msg len                                    */
0147     char msg[0];            /**< the memory segment for structure shall be large 
0148                              * enough to handle the message len 
0149                              * indexed by the array                         */
0150 } tmq_msg_t;
0151 
0152 /**
0153  * Command: delmsg
0154  */
0155 typedef struct
0156 {
0157     tmq_cmdheader_t hdr;
0158 } tmq_msg_del_t;
0159 
0160 /**
0161  * Dummy command
0162  * Transaction marker
0163  */
0164 typedef struct
0165 {
0166     tmq_cmdheader_t hdr;
0167 } tmq_msg_dum_t;
0168 
0169 
0170 /**
0171  * Command: unlock
0172  */
0173 typedef struct
0174 {
0175     tmq_cmdheader_t hdr;
0176     
0177 } tmq_msg_unl_t;
0178 
0179 /** 
0180  * Command: updcounter
0181  */
0182 typedef struct
0183 {
0184     tmq_cmdheader_t hdr;
0185     char status;   /* Status of the message */
0186     long trycounter;        /* try counter */
0187     long trytstamp;
0188     long trytstamp_usec;
0189     
0190 } tmq_msg_upd_t;
0191 
0192 #define UPD_MSG(DEST, SRC)  NDRX_LOG(log_debug, "status [%c] -> [%c]",\
0193                     DEST->status, SRC->status);\
0194             DEST->status = SRC->status;\
0195             NDRX_LOG(log_debug, "trycounter [%ld] -> [%ld]",\
0196                     DEST->trycounter, SRC->trycounter);\
0197             DEST->trycounter = SRC->trycounter;\
0198             NDRX_LOG(log_debug, "trycounter [%ld] -> [%ld]",\
0199                     DEST->trytstamp, SRC->trytstamp);\
0200             DEST->trytstamp = SRC->trytstamp;\
0201             NDRX_LOG(log_debug, "trycounter_usec [%ld] -> [%ld]",\
0202                     DEST->trytstamp, SRC->trytstamp);\
0203             DEST->trytstamp_usec = SRC->trytstamp_usec;
0204 
0205 /**
0206  * Data block
0207  */
0208 union tmq_block {
0209     tmq_cmdheader_t hdr;
0210     tmq_msg_t msg;
0211     tmq_msg_del_t del;
0212     tmq_msg_upd_t upd;
0213     tmq_msg_dum_t dum;
0214 };
0215 
0216 /**
0217  * Update block (either update or delete)
0218  */
0219 union tmq_upd_block {
0220     tmq_cmdheader_t hdr;
0221     tmq_msg_del_t del;
0222     tmq_msg_upd_t upd;
0223     tmq_msg_dum_t dum;
0224 };
0225 
0226 /*---------------------------Globals------------------------------------*/
0227 
0228 extern char ndrx_G_qspace[];    /**< Name of the queue space            */
0229 extern char ndrx_G_qspacesvc[]; /**< real service name                  */
0230 
0231 /*---------------------------Statics------------------------------------*/
0232 /*---------------------------Prototypes---------------------------------*/
0233  
0234 /* util, shared between driver & daemon */
0235 extern int tmq_setup_cmdheader_newmsg(tmq_cmdheader_t *hdr, char *qname, 
0236         short nodeid, short srvid, char *qspace, long flags);
0237 extern void tmq_msgid_gen(char *msgid);
0238 extern char * tmq_msgid_serialize(char *msgid_in, char *msgid_str_out);
0239 extern char * tmq_msgid_deserialize(char *msgid_str_in, char *msgid_out);
0240 extern void tmq_msgid_get_info(char *msgid, short *p_nodeid, short *p_srvid);
0241 extern char * tmq_corrid_serialize(char *corrid_in, char *corrid_str_out);
0242 extern int tmq_finalize_files(UBFH *p_ub);
0243 extern void tmq_set_tmqueue(
0244     int setting
0245     , int (*p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname, 
0246         short nodeid, short srvid, char *qspace, long flags)
0247     , int (*p_tmq_dum_add)(char *tmxid)
0248     , int (*p_tmq_unlock_msg)(union tmq_upd_block *b)
0249     , void (**p_tmq_chkdisk_th)(void *ptr, int *p_finish_off)
0250     , int (*p_tmq_msgid_exists)(char *msgid_str)
0251     , void (*p_tpexit)(void));
0252     
0253 /* From storage driver: */
0254 extern size_t tmq_get_block_len(char *data);
0255 extern int tmq_storage_write_cmd_newmsg(tmq_msg_t *msg, int *int_diag);
0256 extern int tmq_storage_write_cmd_block(char *p_block, char *descr, char *cust_tmxid, 
0257         int *int_diag);
0258 extern int tmq_storage_get_blocks(int (*process_block)(char *tmxid, 
0259         union tmq_block **p_block, int state, int seqno), short nodeid, short srvid);
0260 
0261 /* transaction management: */
0262 extern int ndrx_xa_qminiservce(UBFH *p_ub, char cmd);
0263 
0264 extern int tmq_setup_cmdheader_dum(tmq_cmdheader_t *hdr, char *qname, 
0265         short nodeid, short srvid, char *qspace, long flags);
0266 
0267 extern int tmq_lock_msg(char *msgid);
0268 
0269 extern long tmq_chkdisk_stopwatch_get_delta_sec(void);
0270 extern void tmq_chkdisk_stopwatch_reset(void);
0271 
0272 #ifdef  __cplusplus
0273 }
0274 #endif
0275 
0276 #endif  /* TMQUEUE_H */
0277 
0278 /* vim: set ts=4 sw=4 et smartindent: */