Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Contains network processing part of the bridge.
0003  *
0004  * @file network.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <atmi_int.h>
0044 #include <typed_buf.h>
0045 #include <ndrstandard.h>
0046 #include <ubf.h>
0047 #include <Exfields.h>
0048 #include <userlog.h>
0049 
0050 #include <exnet.h>
0051 #include <exproto.h>
0052 #include <ndrxdcmn.h>
0053 
0054 #include "bridge.h"
0055 #include "../libatmisrv/srv_int.h"
0056 
0057 /*---------------------------Externs------------------------------------*/
0058 /*---------------------------Macros-------------------------------------*/
0059 /*---------------------------Enums--------------------------------------*/
0060 /*---------------------------Typedefs-----------------------------------*/
0061 /*---------------------------Globals------------------------------------*/
0062 /*---------------------------Statics------------------------------------*/
0063 /*---------------------------Prototypes---------------------------------*/
0064 exprivate int br_process_msg_th(void *ptr, int *p_finish_off);
0065 
0066 /**
0067  * Process message from network (wrapper) for dispatching to thread.
0068  * @param net
0069  * @param buf NOTE! we reset the incoming to NULL, as we forward data to thread
0070  * @param len
0071  * @return 
0072  */
0073 expublic int br_process_msg(exnetcon_t *net, char **buf, int len)
0074 {
0075     int ret = EXSUCCEED;
0076     net_brmessage_t *thread_data;
0077     char *fn = "br_process_msg";
0078     
0079     thread_data = NDRX_FPMALLOC(sizeof(net_brmessage_t), 0);
0080     
0081     if (NULL==thread_data)
0082     {
0083         int err = errno;
0084         NDRX_LOG(log_error, "Failed to allocate net_brmessage_t: %s", 
0085                 strerror(err));
0086         
0087         userlog("Failed to allocate net_brmessage_t: %s", 
0088                 strerror(err));
0089         EXFAIL_OUT(ret);
0090     }
0091     
0092     NDRX_LOG(log_debug, "%s: multi thread mode - dispatching to worker", fn);
0093     
0094     thread_data->buf = *buf;
0095     *buf = NULL;/* indicate the we finish off the data buffer */
0096     
0097     thread_data->len = len;
0098     thread_data->net = net;
0099     
0100     if (EXSUCCEED!=ndrx_thpool_add_work2(G_bridge_cfg.thpool_fromnet, (void*)br_process_msg_th, 
0101             (void *)thread_data, 0, G_bridge_cfg.threadpoolbufsz))
0102     {
0103         EXFAIL_OUT(ret);
0104     }
0105 out:
0106             
0107     if (EXSUCCEED!=ret)
0108     {
0109         if (NULL!=thread_data)
0110         {
0111             if (NULL!=thread_data->buf)
0112             {
0113                 NDRX_SYSBUF_FREE(thread_data->buf);
0114             }
0115             NDRX_FPFREE(thread_data);
0116         }
0117     }
0118 
0119     return ret;
0120 }
0121 
0122 /**
0123  * Dump the tpcall
0124  * @param buf message received from net
0125  */
0126 exprivate void br_dump_tp_command_call(char *buf)
0127 {
0128     tp_command_call_t *extra_debug = (tp_command_call_t *)buf;
0129     /* Have some more debug out there: */
0130 
0131     NDRX_LOG(log_debug, "timer = (%ld %ld) %d", 
0132             extra_debug->timer.t.tv_sec,
0133             extra_debug->timer.t.tv_nsec,
0134             ndrx_stopwatch_get_delta_sec(&extra_debug->timer) );
0135 
0136     NDRX_LOG(log_debug, "callseq  %u",    extra_debug->callseq);
0137     NDRX_LOG(log_debug, "msgseq   %u",    extra_debug->msgseq);
0138     NDRX_LOG(log_debug, "cd       %d",    extra_debug->cd);
0139     NDRX_LOG(log_debug, "my_id    [%s]",  extra_debug->my_id);
0140     NDRX_LOG(log_debug, "reply_to [%s]",  extra_debug->reply_to);
0141     NDRX_LOG(log_debug, "name     [%s]",  extra_debug->name);
0142 }
0143 
0144 /**
0145  * Dump the tpcall
0146  * @param buf message received from net
0147  */
0148 exprivate void br_dump_tp_notif_call(char *buf)
0149 {
0150     tp_notif_call_t *extra_debug = (tp_notif_call_t *)buf;
0151     /* Have some more debug out there: */
0152 
0153     NDRX_LOG(log_debug, "timer = (%ld %ld) %d", 
0154             extra_debug->timer.t.tv_sec,
0155             extra_debug->timer.t.tv_nsec,
0156             ndrx_stopwatch_get_delta_sec(&extra_debug->timer) );
0157 
0158     NDRX_LOG(log_debug, "callseq          %u" ,   extra_debug->callseq);
0159     NDRX_LOG(log_debug, "msgseq           %u",    extra_debug->msgseq);
0160     NDRX_LOG(log_debug, "cd               %d",    extra_debug->cd);
0161     NDRX_LOG(log_debug, "my_id            [%s]",  extra_debug->my_id);
0162     NDRX_LOG(log_debug, "reply_to         [%s]",  extra_debug->reply_to);
0163     NDRX_LOG(log_debug, "destclient       [%s]",  extra_debug->destclient);
0164     NDRX_LOG(log_debug, "cltname          [%s]",  extra_debug->cltname);
0165     NDRX_LOG(log_debug, "cltname_isnull   [%d]",  extra_debug->cltname_isnull);
0166     NDRX_LOG(log_debug, "nodeid           [%s]",  extra_debug->nodeid);
0167     NDRX_LOG(log_debug, "nodeid_isnull    [%d]",  extra_debug->nodeid_isnull);
0168     NDRX_LOG(log_debug, "usrname          [%s]",  extra_debug->usrname);
0169     NDRX_LOG(log_debug, "usrname_isnull   [%d]",  extra_debug->usrname_isnull);
0170 }
0171 
0172 /**
0173  * Bridge have received message.
0174  * Got message from Network.
0175  * But we have a problem here, as multiple threads are doing receive, there
0176  * is possibility that conversational is also received in out of order...
0177  * and submitting to specific thread pool will not solve the issue.
0178  * 
0179  * The other option is to do ack on every conversational message.
0180  * 
0181  * Or do message sequencing and reordering at the client end (i.e. consume
0182  * the out of order messages in the process memory, sort the messages by sequence
0183  * number, when message is received or we are going to look for the message, then
0184  * check the queue in memory. If we have something there and looks like correct
0185  * message sequence number, then use it.
0186  * 
0187  * The queue will be dumped after the conversation is closed.
0188  * 
0189  * @param net
0190  * @param buf
0191  * @param len
0192  * @return 
0193  */
0194 exprivate int br_process_msg_th(void *ptr, int *p_finish_off)
0195 {
0196     int ret=EXSUCCEED;
0197     char *tmp=NULL;
0198     /* Also we could thing something better! which does not eat so much stack*/
0199     char *tmp_clr=NULL;
0200     net_brmessage_t *p_netmsg = (net_brmessage_t *)ptr;
0201     
0202     p_netmsg->call = (cmd_br_net_call_t *)p_netmsg->buf;
0203     
0204     
0205     if (G_bridge_cfg.common_format)
0206     {
0207         size_t tmp_buf_len;
0208         long tmp_len = p_netmsg->len;
0209         
0210         NDRX_SYSBUF_MALLOC_OUT(tmp, tmp_buf_len, ret);
0211         
0212         NDRX_LOG(log_debug, "Convert message from network... (tmp buf = %p, size: %ld)", 
0213                 tmp, NDRX_MSGSIZEMAX);
0214         
0215         if (EXFAIL==exproto_proto2ex(p_netmsg->buf, tmp_len,  tmp, &tmp_len, 
0216                 NDRX_MSGSIZEMAX))
0217         {
0218             NDRX_LOG(log_error, "Failed to convert incoming message!");
0219             EXFAIL_OUT(ret);
0220         }
0221         
0222         /* If allocated previously  */
0223         NDRX_SYSBUF_FREE(p_netmsg->buf);
0224         
0225         p_netmsg->buf = tmp;
0226         p_netmsg->len = tmp_len;
0227         
0228         /* Switch ptr to converted one.! */
0229         p_netmsg->call = (cmd_br_net_call_t *)tmp;
0230         /* Should ignore len field...! */
0231         p_netmsg->call->len = tmp_len - EXOFFSET(cmd_br_net_call_t, buf);
0232         
0233         /*
0234         NDRX_LOG(log_debug, "Got c len=%ld bytes (br refresh %d) - internal %ld", 
0235                 tmp_len, sizeof(bridge_refresh_t), call->len);
0236         */
0237         NDRX_DUMP(log_debug, "Got converted packet: ", p_netmsg->call->buf, 
0238                 p_netmsg->call->len);
0239     }
0240     
0241     NDRX_LOG(log_debug, "Got message from net.");
0242     
0243     
0244     if (BR_NET_CALL_MAGIC!=p_netmsg->call->br_magic)
0245     {
0246         NDRX_LOG(log_error, "Got bridge message, but invalid magic: got"
0247                 " %p, expected: %p", p_netmsg->call->br_magic, BR_NET_CALL_MAGIC);
0248         userlog("Got bridge message, but invalid magic: got"
0249                 " %p, expected: %p", p_netmsg->call->br_magic, BR_NET_CALL_MAGIC);
0250         goto out;
0251     }
0252     
0253     if (BR_NET_CALL_MSG_TYPE_ATMI==p_netmsg->call->msg_type ||
0254             BR_NET_CALL_MSG_TYPE_NOTIF==p_netmsg->call->msg_type)
0255     {
0256         tp_command_generic_t *gen_command = (tp_command_generic_t *)p_netmsg->call->buf;
0257 
0258         NDRX_LOG(log_debug, "ATMI message, command id=%d", 
0259                 gen_command->command_id);
0260         
0261         switch (gen_command->command_id)
0262         {
0263             
0264             case ATMI_COMMAND_TPCALL:
0265             case ATMI_COMMAND_CONNECT:
0266                 NDRX_LOG(log_debug, "tpcall or connect");
0267                 br_dump_tp_command_call(p_netmsg->call->buf);
0268                 /* If this is a call, then we should append caller address */
0269                 if (EXSUCCEED!=br_tpcall_pushstack((tp_command_call_t *)gen_command))
0270                 {
0271                     EXFAIL_OUT(ret);
0272                 }
0273                 /* Call service */
0274                 NDRX_LOG(log_debug, "About to call service...");
0275                 ret=br_submit_to_service((tp_command_call_t *)gen_command, 
0276                         p_netmsg->call->len);
0277                 break;
0278              
0279             /* tpreply & conversation goes via reply Q */
0280             case ATMI_COMMAND_TPREPLY:
0281             case ATMI_COMMAND_CONVDATA:
0282             case ATMI_COMMAND_CONNRPLY:
0283             case ATMI_COMMAND_DISCONN:
0284             case ATMI_COMMAND_CONNUNSOL:
0285             case ATMI_COMMAND_CONVACK:
0286             case ATMI_COMMAND_SHUTDOWN:
0287                 br_dump_tp_command_call(p_netmsg->call->buf);
0288                 /* TODO: So this is reply... we should pop the stack and decide 
0289                  * where to send the message, either to service replyQ
0290                  * or other node 
0291                  */
0292                 NDRX_LOG(log_debug, "Reply back to caller/bridge");
0293                 ret = br_submit_reply_to_q((tp_command_call_t *)gen_command, 
0294                         p_netmsg->call->len);
0295                 break;
0296             case ATMI_COMMAND_TPFORWARD:
0297                 br_dump_tp_command_call(p_netmsg->call->buf);
0298                 /* not used */
0299                 break;
0300             case ATMI_COMMAND_BROADCAST:
0301             case ATMI_COMMAND_TPNOTIFY:
0302             {
0303                 tp_notif_call_t * p_notif = (tp_notif_call_t *)gen_command;
0304                 /* Call the reply Q
0305                  * If this is broadcast, then we send it to broadcast server
0306                  * If this is notification, then send to client proc only.
0307                  */
0308                 NDRX_LOG(log_debug, "Sending tpnotify to client queue... "
0309                         "(flags got: %ld, regex: %d)",
0310                         p_notif->flags, p_notif->flags & TPREGEXMATCH);
0311                 br_dump_tp_notif_call(p_netmsg->call->buf);
0312                 
0313 /*
0314                 ret = br_submit_reply_to_q_notif((tp_notif_call_t *)gen_command, 
0315                         p_netmsg->call->len, NULL);
0316   */            
0317                 ret = br_submit_to_service_notif((tp_notif_call_t *)gen_command, 
0318                         p_netmsg->call->len);
0319                 
0320             }   
0321                 break;
0322         }
0323     }
0324     else if (BR_NET_CALL_MSG_TYPE_NDRXD==p_netmsg->call->msg_type)
0325     {
0326         command_call_t *icall = (command_call_t *)p_netmsg->call->buf;
0327         int call_len = p_netmsg->call->len;
0328         
0329         /* TODO: Might want to check the buffers sizes to minimum */
0330         NDRX_LOG(log_debug, "NDRX message, call_len=%d", call_len);
0331         
0332         switch (icall->command)
0333         {
0334             case NDRXD_COM_BRCLOCK_RQ:
0335                 ret = br_calc_clock_diff(icall);
0336                 break;
0337             case NDRXD_COM_BRREFERSH_RQ:
0338                 ret = br_submit_to_ndrxd(icall, call_len);
0339                 break;
0340             default:
0341                 NDRX_LOG(log_debug, "Unsupported bridge command: %d",
0342                             icall->command);
0343                 break;
0344         }
0345          
0346     }
0347 out:
0348               
0349     if (NULL!=p_netmsg->buf)
0350     {
0351         NDRX_SYSBUF_FREE(p_netmsg->buf);
0352     }
0353 
0354     NDRX_FPFREE(p_netmsg);
0355 
0356     return ret;
0357 }
0358 
0359 /**
0360  * Send message to other bridge.
0361  * Might want to use async call, as there Net stack could be full & blocked.
0362  * @param buf
0363  * @param len
0364  * @return 
0365  */
0366 expublic int br_send_to_net(char *buf, int len, char msg_type, int command_id)
0367 {
0368     int ret=EXSUCCEED;
0369     char *fn = "br_send_to_net";
0370     char *tmp2 = NULL;
0371     size_t tmp2_len;
0372     char **snd;
0373     long snd_len;
0374     int use_hdr = EXFALSE;
0375     char smallbuf[sizeof(cmd_br_net_call_t) + sizeof(char *)];
0376     cmd_br_net_call_t *call = (cmd_br_net_call_t *)smallbuf;
0377     
0378     NDRX_LOG(log_debug, "%s: sending %d bytes", fn, len);
0379     
0380     if (NULL==G_bridge_cfg.con)
0381     {
0382         NDRX_LOG(log_error, "Bridge is not connected!!!");
0383         EXFAIL_OUT(ret);
0384     }
0385     
0386     /*do some optimisation memset(tmp, 0, sizeof(tmp)); */
0387     call->br_magic = BR_NET_CALL_MAGIC;
0388     call->msg_type = msg_type;
0389     call->command_id = command_id;
0390     call->len = len;
0391     
0392     if (G_bridge_cfg.common_format)
0393     {
0394         /* get away from this memcpy somehow? 
0395          * Enduro/X 8.0 - no mem copy anymore!
0396          */
0397         memcpy(call->buf, &buf, sizeof(char *));
0398             
0399         NDRX_LOG(log_debug, "Convert message to network...");
0400         /* do some more optimization: memset(tmp2, 0, sizeof(tmp2)); */
0401         NDRX_SYSBUF_MALLOC_OUT(tmp2, tmp2_len, ret);
0402         
0403         snd = &tmp2;
0404         snd_len = 0;
0405         
0406         /* Set the output buffer size border. */
0407         if (EXSUCCEED!=exproto_ex2proto((char *)call, len + sizeof(*call), tmp2, 
0408                 &snd_len, tmp2_len))
0409         {
0410             ret=EXFAIL;
0411             goto out;
0412         }
0413         
0414     }
0415     else
0416     {
0417         /* faster route, no copy */
0418         use_hdr=EXTRUE;
0419     }
0420     
0421     
0422     /* Might want to move this stuff to Q */
0423     
0424     /* the connection object is created by main thread
0425      * and calls are dispatched by main thread too. Thus 
0426      * existence of con must be atomic.
0427      *  */
0428     if (NULL!=G_bridge_cfg.con)
0429     {
0430         /* Lock to network */
0431         exnet_rwlock_read(G_bridge_cfg.con);
0432                 
0433         if (exnet_is_connected(G_bridge_cfg.con))
0434         {
0435             if (use_hdr)
0436             {
0437                 if (EXSUCCEED!=exnet_send_sync(G_bridge_cfg.con, (char *)call, 
0438                         sizeof(cmd_br_net_call_t), (char *)buf, len, 0, 0))
0439                 {
0440                     NDRX_LOG(log_error, "Failed to submit message to network");
0441                     ret=EXFAIL;
0442                 }
0443             }
0444             else
0445             {
0446                 /* slower, prev memcopy */
0447                 if (EXSUCCEED!=exnet_send_sync(G_bridge_cfg.con, NULL, 0,
0448                         (char *)*snd, snd_len, 0, 0))
0449                 {
0450                     NDRX_LOG(log_error, "Failed to submit message to network");
0451                     ret=EXFAIL;
0452                 }
0453             }
0454         }
0455         else
0456         {
0457             NDRX_LOG(log_error, "Node disconnected - cannot send");
0458             ret=EXFAIL;
0459         }
0460         
0461         /* unlock the network */
0462         exnet_rwlock_unlock(G_bridge_cfg.con); 
0463         
0464         if (EXSUCCEED!=ret)
0465         {
0466             goto out;
0467         }
0468         
0469     }
0470     else
0471     {
0472         NDRX_LOG(log_error, "Node disconnected - cannot send");
0473         EXFAIL_OUT(ret);
0474     }
0475     
0476 out:
0477                 
0478     if (NULL!=tmp2)
0479     {
0480         NDRX_SYSBUF_FREE(tmp2);
0481     }
0482 
0483     return ret;
0484 }
0485 /* vim: set ts=4 sw=4 et smartindent: */