Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Bridge commons
0003  *
0004  * @file bridge.h
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 #ifndef BRIDGE_H
0036 #define BRIDGE_H
0037 
0038 #ifdef  __cplusplus
0039 extern "C" {
0040 #endif
0041 
0042 /*---------------------------Includes-----------------------------------*/
0043 #include <sys_unix.h>
0044 #include <exthpool.h>
0045 #include <pthread.h>
0046 #include <thlock.h>
0047 /*---------------------------Externs------------------------------------*/
0048 /*---------------------------Macros-------------------------------------*/
0049     
0050 #define BR_QRETRIES_DEFAULT             999999  /**< Default number of retries    */
0051 #define BR_DEFAULT_THPOOL_SIZE          2       /**< Default threadpool size      */
0052 #define BR_THREAD_ENTRY if (!G_thread_init) \
0053          { \
0054                 if (EXSUCCEED==tpinit(NULL))\
0055                 { \
0056                     G_thread_init=EXTRUE; \
0057                 } \
0058                 else \
0059                 { \
0060                     EXFAIL_OUT(ret);\
0061                 } \
0062          }
0063 #define DEFAULT_QUEUE_SIZE          100    /**< max nr of queued messages dflt */
0064 #define DEFAULT_QUEUE_MAXSLEEP      150    /**< Max number milliseconds to sleep */
0065 #define DEFAULT_QUEUE_MINSLEEP      40     /**< Mininum sleep between attempts */
0066     
0067 #define     PACK_TYPE_TONDRXD   1   /**< Send message NDRXD                   */
0068 #define     PACK_TYPE_TOSVC     2   /**< Send to service, use timer (their)   */
0069 #define     PACK_TYPE_TORPLYQ   3   /**< Send to reply q, use timer (internal)*/
0070     
0071 /* List of queue actions: */
0072 #define QUEUE_ACTION_BLOCK         0   /**< Block the traffic                 */
0073 #define QUEUE_ACTION_DROP          1   /**< Drop the msg                      */
0074 #define QUEUE_ACTION_IGNORE        2   /**< Ignore the condition              */
0075     
0076 /* List of action flags: */
0077 #define QUEUE_FLAG_ACTION_BLKIGN        1 /* Global queue full - block, svc queue full ignore */
0078 #define QUEUE_FLAG_ACTION_BLKDROP       2 /* Global queue full - block, svc queue full - drop */
0079 #define QUEUE_FLAG_ACTION_DROPDROP      3 /* Global queue full - drop, svc queue full - drop  */
0080     
0081 #define BR_MAX_ROUNDTRIP        200 /**< Allow 200 ms roundtrip for time default for timesync   */
0082 #define BR_PERIODIC_CLOCK_SND   600 /**< Send clocks every 10 minutes                           */
0083 #define BR_ADMININFO_TOUT       3   /**< Allow 3 seconds on full reply queue for metrics..      */
0084 /*---------------------------Enums--------------------------------------*/
0085 /*---------------------------Typedefs-----------------------------------*/
0086 
0087 /**
0088  * Bridge ndrx_config.handler
0089  */
0090 typedef struct
0091 {
0092     int nodeid;                   /**< External node id */
0093     exnetcon_t net;               /**< Network handler, might be client or server...  */
0094     exnetcon_t *con;              /**< Real working connection  */
0095     char svc[XATMI_SERVICE_NAME_LENGTH+1];  /**< Service name used by this bridge */
0096     
0097     long long timediff;                 /**< Bridge time correction           */
0098     ndrx_stopwatch_t timediff_ourt;     /**< Our stopwatch value              */
0099     NDRX_SPIN_LOCKDECL(timediff_lock);   /**< diff read/write fast update      */
0100     long timediff_roundtrip;       /**< roundript ms for time data ping echo  */
0101     long max_roundtrip;            /**< Max allowed roundtrip for tdiff       */
0102     
0103     int common_format;            /**< Common platform format. */
0104     int qretries;                 /**< Queue Resubmit retries */
0105     int qsize;                    /**< Number of messages stored in memory before blocking */
0106     int qsizesvc;                 /**< Single service queue size                */
0107     int qttl;                     /**< Number of miliseconds for message to live in queue */
0108     int qmaxsleep;                /**< Max number of millisecionds to sleep between attempts */
0109     int qminsleep;                /**< Min number of millisecionds to sleep between attempts */
0110     
0111     int qfullaction;              /**< Action for full queue                  */
0112     int qfullactionsvc;           /**< Action One service queue full          */
0113     
0114     int threadpoolbufsz;          /**< Threadpool buffer size, lock after full */
0115     int threadpoolsize;           /**< Thread pool size */
0116     int check_interval;           /**< connection checking interval             */
0117     threadpool thpool_tonet;      /**< Thread pool by it self */
0118     
0119     int is_server;                /**< Is server a client ? */
0120     /* Support #502, we get deadlock when both nodes all threads attempt to send
0121      * and there is no one who performs receive, all sockets become full */
0122     threadpool thpool_fromnet;    /**< Thread pool by it self */
0123     
0124     threadpool thpool_queue;    /**< Queue runner */
0125     
0126 } bridge_cfg_t;
0127 
0128 typedef struct in_msg in_msg_t;
0129 struct in_msg
0130 {
0131     int pack_type;
0132     char destqstr[NDRX_MAX_Q_SIZE+1];  /**< Destination queue to which sent msg */
0133     char *buffer;
0134     int len;
0135     int tries;                    /**< number of attempts for sending msg to Q */
0136     ndrx_stopwatch_t addedtime;   /**< Time in Q                               */
0137     ndrx_stopwatch_t updatetime;  /**< Last time when msg was processed        */
0138     int next_try_ms;              /**< When the next attempt is scheduled      */
0139     in_msg_t *prev, *next;
0140 };
0141 
0142 
0143 typedef struct in_msg_hash in_msg_hash_t;
0144 struct in_msg_hash
0145 {
0146     char qstr[NDRX_MAX_Q_SIZE+1];/**< Posix queue name string                */
0147     int nrmsg;                   /**< current number of messages per posix q */
0148     in_msg_t  *msgs;             /**< DL list of messages                    */
0149     EX_hash_handle hh;
0150 };
0151 
0152 /**
0153  * Message received from XATMI, for sending to network, by thread
0154  */
0155 typedef struct
0156 {
0157     char *buf;
0158     int len;
0159     char msg_type;
0160     
0161 } xatmi_brmessage_t;
0162 
0163 /**
0164  * Message received from network and submitted to thread
0165  */
0166 typedef struct
0167 {
0168     char *buf;
0169     int len;
0170     exnetcon_t *net;
0171     cmd_br_net_call_t *call; /* Intermediate field */
0172 } net_brmessage_t;
0173 
0174 /*---------------------------Globals------------------------------------*/
0175 extern bridge_cfg_t G_bridge_cfg;
0176 extern __thread int G_thread_init;
0177 extern pthread_mutex_t ndrx_G_global_br_lock;
0178 /*---------------------------Statics------------------------------------*/
0179 /*---------------------------Prototypes---------------------------------*/
0180 extern int br_submit_to_ndrxd(command_call_t *call, int len);
0181 extern int br_submit_to_service(tp_command_call_t *call, int len);
0182 extern int br_submit_to_service_notif(tp_notif_call_t *call, int len);
0183 extern int br_got_message_from_q(char **buf, int len, char msg_type);
0184 extern int br_submit_reply_to_q(tp_command_call_t *call, int len);
0185     
0186 extern int br_process_msg(exnetcon_t *net, char **buf, int len);
0187 extern int br_send_to_net(char *buf, int len, char msg_type, int command_id);
0188 
0189 extern int br_calc_clock_diff(command_call_t *call);
0190 extern int br_coninfo(command_call_t *call);
0191 extern int br_send_clock(int mode, cmd_br_time_sync_t *rcv);
0192 extern void br_clock_adj(tp_command_call_t *call, int is_out);
0193 
0194 extern int br_tpcall_pushstack(tp_command_call_t *call);
0195 extern int br_get_conv_cd(char msg_type, char *buf, int *p_pool);
0196 extern int br_chk_limit(void);
0197 
0198 extern int br_netin_setup(void);
0199 extern void br_netin_shutdown(void);
0200 
0201 extern int br_process_error(char *buf, int len, int err, in_msg_t* from_q, 
0202         int pack_type, char *destqstr, in_msg_hash_t * qhash);
0203 
0204 extern void br_tempq_init(void);
0205 extern int br_add_to_q(char *buf, int len, int pack_type, char *destq);
0206 
0207 #ifdef  __cplusplus
0208 }
0209 #endif
0210 
0211 #endif  /* TPEVSV_H */
0212 
0213 /* vim: set ts=4 sw=4 et smartindent: */