Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Q for EnduroX, shared header between XA driver and TMQ server
0003  *
0004  * @file tmqueue.h
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 #ifndef TMQUEUE_H
0036 #define TMQUEUE_H
0037 
0038 #ifdef  __cplusplus
0039 extern "C" {
0040 #endif
0041 
0042 /*---------------------------Includes-----------------------------------*/
0043 #include <xa_cmn.h>
0044 #include <atmi.h>
0045 #include <exhash.h>
0046 #include <exthpool.h>
0047     
0048 /*---------------------------Externs------------------------------------*/
0049 /*---------------------------Macros-------------------------------------*/
0050 
0051 /* Basically we have a two forms of MSGID
0052  * 1. Native, 32 byte binary byte array
0053  * 2. String, Base64, 
0054  */
0055 #define TMQ_DEFAULT_Q           "@"         /**< Symbol for default Q       */
0056 
0057 #define TMQ_MAGIC               "ETQ3"      /**< magic of tmq record        */
0058 #define TMQ_MAGIC2              "END3"      /**< magic of tmq record, end   */
0059 #define TMQ_MAGIC_LEN           4           /**< the len of message magic   */
0060 
0061 #define TMQ_MAGICBASE           "ETQ"       /**< magic without version      */
0062 #define TMQ_MAGICBASE2          "END"       /**< magic without version, end */
0063 #define TMQ_MAGICBASE_LEN       3           /**< magic without version      */
0064     
0065 #define TMQ_STORCMD_NEWMSG          'N'     /**< Command code - new message */
0066 #define TMQ_STORCMD_UPD             'U'     /**< Command code - update msg  */
0067 #define TMQ_STORCMD_DEL             'D'     /**< Command code - delete msg  */
0068 #define TMQ_STORCMD_UNLOCK          'L'     /**< Command code - unlock msg  */
0069 #define TMQ_STORCMD_DUM             'M'     /**< Command code - dummy msg
0070                                             for transaction identification  */
0071     
0072 
0073 /**
0074  * Status codes of the message
0075  */
0076 #define TMQ_STATUS_ACTIVE       'A'         /**< Message is active          */
0077 #define TMQ_STATUS_DONE         'D'         /**< Message is done            */
0078 #define TMQ_STATUS_EXPIRED      'E'         /**< Message is expired  or tries exceeded  */
0079 #define TMQ_STATUS_SUSPENDED    'S'         /**< Message is suspended       */
0080     
0081     
0082 #define TMQ_SYS_ASYNC_CPLT    0x00000001    /**< Complete message in async mode */
0083     
0084 /**
0085  * List of tmq specific internal errors
0086  * @defgroup tmq_errors
0087  * @{
0088  */
0089 
0090 #define TMQ_ERR_VERSION          1          /**< Version error              */
0091 #define TMQ_ERR_EOF              2          /**< File is truncated          */
0092 #define TMQ_ERR_CORRUPT          3          /**< File contents are corrupted*/
0093     
0094 /** @} */ /* end of tmq_errors */
0095     
0096     
0097 #define TMQ_HOUSEKEEP_DEFAULT   (90*60)     /**< houskeep 1 hour 30 min     */
0098 
0099 #define TMQ_INT_DIAG_EJOIN      0x00000001  /**< Got join transaction join error */
0100 /*---------------------------Enums--------------------------------------*/
0101 /*---------------------------Typedefs-----------------------------------*/
0102 
0103 /**
0104  * Common command header
0105  */
0106 typedef struct
0107 {
0108     char magic[4];          /**< File magic   1                             */
0109     short srvid;
0110     short nodeid;
0111     /* TODO: Consider adding rmid, if running different queue spaces in 
0112      * the same folder
0113      */
0114     char qname[TMQNAMELEN+1];
0115     char qspace[XATMI_SERVICE_NAME_LENGTH+1];
0116     char command_code;      /**< command code, see TMQ_CMD*                 */
0117     char msgid[TMMSGIDLEN]; /**< message_id                                 */
0118     long flags;             /**< Copy of message flags                      */
0119     char reserved[64];      /**< Reversed space for future upgrades         */
0120     char magic2[4];         /**< File magic                                 */
0121 } tmq_cmdheader_t;
0122 
0123 /** 
0124  * Command: qmessage 
0125  */
0126 typedef struct
0127 {
0128     /** Lets have first 512 bytes of dynamic infos:
0129      * so that update fits in one sector update, if in future
0130      * we perform optimizations:
0131      */
0132     tmq_cmdheader_t hdr;
0133     uint64_t lockthreadid;  /**< Locked thread id                           */
0134     char status;            /**< Status of the message                      */
0135     long trycounter;        /**< try counter                                */
0136     long msgtstamp;         /**< epoch up to second                         */
0137     long msgtstamp_usec;    /**< 1/10^6 sec                                 */
0138     int msgtstamp_cntr;     /**< Message counter for same time interval     */
0139     long trytstamp;         /**< epoch up to second                         */
0140     long trytstamp_usec;    /**< 1/10^6 sec                                 */
0141     
0142     TPQCTL qctl;            /**< Queued message                             */
0143     /* Message log (stored only in file)                                    */
0144     long len;               /**< msg len                                    */
0145     char msg[0];            /**< the memory segment for structure shall be large 
0146                              * enough to handle the message len 
0147                              * indexed by the array                         */
0148 } tmq_msg_t;
0149 
0150 /**
0151  * Command: delmsg
0152  */
0153 typedef struct
0154 {
0155     tmq_cmdheader_t hdr;
0156 } tmq_msg_del_t;
0157 
0158 /**
0159  * Dummy command
0160  * Transaction marker
0161  */
0162 typedef struct
0163 {
0164     tmq_cmdheader_t hdr;
0165 } tmq_msg_dum_t;
0166 
0167 
0168 /**
0169  * Command: unlock
0170  */
0171 typedef struct
0172 {
0173     tmq_cmdheader_t hdr;
0174     
0175 } tmq_msg_unl_t;
0176 
0177 /** 
0178  * Command: updcounter
0179  */
0180 typedef struct
0181 {
0182     tmq_cmdheader_t hdr;
0183     char status;   /* Status of the message */
0184     long trycounter;        /* try counter */
0185     long trytstamp;
0186     long trytstamp_usec;
0187     
0188 } tmq_msg_upd_t;
0189 
0190 #define UPD_MSG(DEST, SRC)  NDRX_LOG(log_debug, "status [%c] -> [%c]",\
0191                     DEST->status, SRC->status);\
0192             DEST->status = SRC->status;\
0193             NDRX_LOG(log_debug, "trycounter [%ld] -> [%ld]",\
0194                     DEST->trycounter, SRC->trycounter);\
0195             DEST->trycounter = SRC->trycounter;\
0196             NDRX_LOG(log_debug, "trycounter [%ld] -> [%ld]",\
0197                     DEST->trytstamp, SRC->trytstamp);\
0198             DEST->trytstamp = SRC->trytstamp;\
0199             NDRX_LOG(log_debug, "trycounter_usec [%ld] -> [%ld]",\
0200                     DEST->trytstamp, SRC->trytstamp);\
0201             DEST->trytstamp_usec = SRC->trytstamp_usec;
0202 
0203 /**
0204  * Data block
0205  */
0206 union tmq_block {
0207     tmq_cmdheader_t hdr;
0208     tmq_msg_t msg;
0209     tmq_msg_del_t del;
0210     tmq_msg_upd_t upd;
0211     tmq_msg_dum_t dum;
0212 };
0213 
0214 /**
0215  * Update block (either update or delete)
0216  */
0217 union tmq_upd_block {
0218     tmq_cmdheader_t hdr;
0219     tmq_msg_del_t del;
0220     tmq_msg_upd_t upd;
0221     tmq_msg_dum_t dum;
0222 };
0223 
0224 /*---------------------------Globals------------------------------------*/
0225 
0226 extern char ndrx_G_qspace[];    /**< Name of the queue space            */
0227 extern char ndrx_G_qspacesvc[]; /**< real service name                  */
0228 
0229 /*---------------------------Statics------------------------------------*/
0230 /*---------------------------Prototypes---------------------------------*/
0231  
0232 /* util, shared between driver & daemon */
0233 extern int tmq_setup_cmdheader_newmsg(tmq_cmdheader_t *hdr, char *qname, 
0234         short nodeid, short srvid, char *qspace, long flags);
0235 extern void tmq_msgid_gen(char *msgid);
0236 extern char * tmq_msgid_serialize(char *msgid_in, char *msgid_str_out);
0237 extern char * tmq_msgid_deserialize(char *msgid_str_in, char *msgid_out);
0238 extern void tmq_msgid_get_info(char *msgid, short *p_nodeid, short *p_srvid);
0239 extern char * tmq_corrid_serialize(char *corrid_in, char *corrid_str_out);
0240 extern int tmq_finalize_files(UBFH *p_ub);
0241 extern void tmq_set_tmqueue(
0242     int setting
0243     , int (*p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname, 
0244         short nodeid, short srvid, char *qspace, long flags)
0245     , int (*p_tmq_dum_add)(char *tmxid)
0246     , int (*p_tmq_unlock_msg)(union tmq_upd_block *b)
0247     , void (**p_tmq_chkdisk_th)(void *ptr, int *p_finish_off)
0248     , int (*p_tmq_msgid_exists)(char *msgid_str)
0249     , void (*p_tpexit)(void));
0250     
0251 /* From storage driver: */
0252 extern size_t tmq_get_block_len(char *data);
0253 extern int tmq_storage_write_cmd_newmsg(tmq_msg_t *msg, int *int_diag);
0254 extern int tmq_storage_write_cmd_block(char *p_block, char *descr, char *cust_tmxid, 
0255         int *int_diag);
0256 extern int tmq_storage_get_blocks(int (*process_block)(char *tmxid, 
0257         union tmq_block **p_block, int state, int seqno), short nodeid, short srvid);
0258 
0259 /* transaction management: */
0260 extern int ndrx_xa_qminiservce(UBFH *p_ub, char cmd);
0261 
0262 extern int tmq_setup_cmdheader_dum(tmq_cmdheader_t *hdr, char *qname, 
0263         short nodeid, short srvid, char *qspace, long flags);
0264 
0265 extern int tmq_lock_msg(char *msgid);
0266 
0267 extern long tmq_chkdisk_stopwatch_get_delta_sec(void);
0268 extern void tmq_chkdisk_stopwatch_reset(void);
0269 
0270 #ifdef  __cplusplus
0271 }
0272 #endif
0273 
0274 #endif  /* TMQUEUE_H */
0275 
0276 /* vim: set ts=4 sw=4 et smartindent: */