Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Enduro/X Queue processing (ATMI/NDRXD queues).
0003  *   Addtional processing:
0004  *   Internal message queue used for cases when target
0005  *   service queues are full. So that if we try to send, we do not
0006  *   block the whole bridge, but instead we leave the message in internal linked
0007  *   list and try later. If tries are exceeded time-out value, then we just drop
0008  *   the message.
0009  *
0010  * @file queue.c
0011  */
0012 /* -----------------------------------------------------------------------------
0013  * Enduro/X Middleware Platform for Distributed Transaction Processing
0014  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0015  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0016  * This software is released under one of the following licenses:
0017  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0018  * See LICENSE file for full text.
0019  * -----------------------------------------------------------------------------
0020  * AGPL license:
0021  *
0022  * This program is free software; you can redistribute it and/or modify it under
0023  * the terms of the GNU Affero General Public License, version 3 as published
0024  * by the Free Software Foundation;
0025  *
0026  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0027  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0028  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0029  * for more details.
0030  *
0031  * You should have received a copy of the GNU Affero General Public License along 
0032  * with this program; if not, write to the Free Software Foundation, Inc.,
0033  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0034  *
0035  * -----------------------------------------------------------------------------
0036  * A commercial use license is available from Mavimax, Ltd
0037  * contact@mavimax.com
0038  * -----------------------------------------------------------------------------
0039  */
0040 #include <stdio.h>
0041 #include <stdlib.h>
0042 #include <string.h>
0043 #include <errno.h>
0044 #include <regex.h>
0045 #include <utlist.h>
0046 
0047 #include <ndebug.h>
0048 #include <atmi.h>
0049 #include <atmi_int.h>
0050 #include <atmi_shm.h>
0051 #include <typed_buf.h>
0052 #include <ndrstandard.h>
0053 #include <ubf.h>
0054 #include <Exfields.h>
0055 #include <gencall.h>
0056 #include <assert.h>
0057 
0058 #include <exnet.h>
0059 #include <ndrxdcmn.h>
0060 #include "bridge.h"
0061 #include "../libatmisrv/srv_int.h"
0062 #include "userlog.h"
0063 #include "cgreen/assertions.h"
0064 /*---------------------------Externs------------------------------------*/
0065 /*---------------------------Macros-------------------------------------*/
0066 /*---------------------------Enums--------------------------------------*/
0067 /*---------------------------Typedefs-----------------------------------*/
0068 /*---------------------------Globals------------------------------------*/
0069 /*---------------------------Statics------------------------------------*/
0070 /*---------------------------Prototypes---------------------------------*/
0071 
0072 exprivate int br_got_message_from_q_th(void *ptr, int *p_finish_off);
0073 
0074 /**
0075  * Send stuff directly to NDRXD
0076  */
0077 expublic int br_submit_to_ndrxd(command_call_t *call, int len)
0078 {
0079     int ret=EXSUCCEED;
0080     char *qstr = ndrx_get_G_atmi_conf()->ndrxd_q_str;
0081     
0082     if (EXSUCCEED!=(ret=ndrx_generic_q_send(qstr, 
0083             (char *)call, len, TPNOBLOCK, 0)))
0084     {
0085         NDRX_LOG(log_error, "Failed to send message to ndrxd!");
0086         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TONDRXD, qstr, 
0087                 NULL);
0088     }
0089     
0090 out:
0091     return EXSUCCEED;    
0092 }
0093 
0094 /**
0095  * Submit to service. We should do this via Q
0096  */
0097 expublic int br_submit_to_service(tp_command_call_t *call, int len)
0098 {
0099     int ret=EXSUCCEED;
0100     char svc_q[NDRX_MAX_Q_SIZE+1];
0101     int is_bridge = EXFALSE;
0102 
0103     /* Resolve the service in SHM 
0104      *   sprintf(svc_q, NDRX_SVC_QFMT, G_server_conf.q_prefix, call->name); */
0105 
0106     if (EXSUCCEED!=ndrx_shm_get_svc(call->name, svc_q, &is_bridge, NULL))
0107     {
0108         NDRX_LOG(log_error, "Failed to get local service [%s] for bridge call!",
0109                 call->name);
0110         userlog("Failed to get local service [%s] for bridge call!", call->name);
0111         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TOSVC, NULL, NULL);
0112         EXFAIL_OUT(ret);
0113     }
0114 
0115     
0116     NDRX_LOG(log_debug, "Calling service: %s", svc_q);
0117     if (EXSUCCEED!=(ret=ndrx_generic_q_send(svc_q, (char *)call, len, TPNOBLOCK, 0)))
0118     {
0119         NDRX_LOG(log_error, "Failed to send message to local ATMI service!");
0120         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TOSVC, svc_q, NULL);
0121     }
0122     /* TODO: Check the result, if called failed, then reply back with error? */
0123     
0124 out:
0125     return EXSUCCEED;    
0126 }
0127 
0128 /**
0129  * Send message to broadcast service
0130  * @param call
0131  * @param len
0132  * @param from_q
0133  * @return 
0134  */
0135 expublic int br_submit_to_service_notif(tp_notif_call_t *call, int len)
0136 {
0137     int ret=EXSUCCEED;
0138     char svcnm[MAXTIDENT];
0139     int is_bridge = EXFALSE;
0140     char svc_q[NDRX_MAX_Q_SIZE+1];
0141     
0142     snprintf(svcnm, sizeof(svcnm), NDRX_SVC_TPBROAD, tpgetnodeid());
0143 
0144     if (EXSUCCEED!=ndrx_shm_get_svc(svcnm, svc_q, &is_bridge, NULL))
0145     {
0146         NDRX_LOG(log_error, "Failed to get local service [%s] for bridge call!",
0147                 svcnm);
0148         userlog("Failed to get local service [%s] for bridge call!", svcnm);
0149         
0150         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TOSVC, NULL, NULL);
0151         
0152         EXFAIL_OUT(ret);
0153     }
0154     
0155     NDRX_LOG(log_debug, "Calling broadcast server: %s with flags %ld", 
0156             svc_q, call->flags);
0157     
0158     /* TODO: Shouldn't we add TPNOBLOCK?? */
0159     if (EXSUCCEED!=(ret=ndrx_generic_q_send(svc_q, (char *)call, len, 
0160             /* call->flags ??*/TPNOBLOCK, 0)))
0161     {
0162         NDRX_LOG(log_error, "Failed to send message to ndrxd!");
0163         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TOSVC, svc_q, NULL);
0164     }
0165     
0166     /* TODO: Check the result, if called failed, then reply back with error? */
0167     
0168 out:
0169     return EXSUCCEED;    
0170 }
0171 
0172 
0173 /**
0174  * Submit reply to service. We should do this via Q for ATMI command
0175  * @param call
0176  * @param len
0177  * @param from_q
0178  * @return 
0179  */
0180 expublic int br_submit_reply_to_q(tp_command_call_t *call, int len)
0181 {
0182     char reply_to[NDRX_MAX_Q_SIZE+1];
0183     int ret=EXSUCCEED;
0184     
0185     /* TODO??: We have problem here, because of missing reply_to */
0186     if (EXSUCCEED!=fill_reply_queue(call->callstack, call->reply_to, reply_to))
0187     {
0188         NDRX_LOG(log_error, "Failed to send message to ndrxd!");
0189         goto out;
0190     }
0191     
0192     NDRX_LOG(log_debug, "Reply to Q: %s", reply_to);
0193     if (EXSUCCEED!=(ret=ndrx_generic_q_send(reply_to, (char *)call, len, TPNOBLOCK, 0)))
0194     {
0195         NDRX_LOG(log_error, "Failed to send message to %s!", reply_to);
0196         br_process_error((char *)call, len, ret, NULL, PACK_TYPE_TORPLYQ, reply_to, NULL);
0197         goto out;
0198     }
0199     
0200 out:
0201     return EXSUCCEED;    
0202 }
0203 
0204 /**
0205  * Thread entry wrapper...
0206  * Sending message to network
0207  * @param buf double ptr to dispatch msg
0208  * @param len
0209  * @param msg_type
0210  * @return 
0211  */
0212 expublic int br_got_message_from_q(char **buf, int len, char msg_type)
0213 {
0214     int ret = EXSUCCEED;
0215     xatmi_brmessage_t *thread_data;
0216     char *fn = "br_got_message_from_q";
0217     
0218     NDRX_LOG(log_debug, "%s: threaded mode - dispatching to worker", fn);
0219     
0220     thread_data = NDRX_FPMALLOC(sizeof(xatmi_brmessage_t), 0);
0221 
0222     if (NULL==thread_data)
0223     {
0224         int err = errno;
0225         NDRX_LOG(log_error, "Failed to allocate xatmi_brmessage_t: %s", 
0226                 strerror(err));
0227         
0228         userlog("Failed to allocate xatmi_brmessage_t: %s", 
0229                 strerror(err));
0230         EXFAIL_OUT(ret);
0231     }
0232     
0233     thread_data->buf = *buf;
0234     *buf = NULL;
0235     thread_data->len = len;
0236     thread_data->msg_type = msg_type;
0237     
0238     /* limit the sending rate... to avoid internal buffers... */
0239     if (EXSUCCEED!=ndrx_thpool_add_work2(G_bridge_cfg.thpool_tonet, 
0240             (void*)br_got_message_from_q_th, 
0241             (void *)thread_data, 0, G_bridge_cfg.threadpoolbufsz))
0242     {
0243         EXFAIL_OUT(ret);
0244     }
0245     
0246 out:
0247             
0248     if (EXSUCCEED!=ret)
0249     {
0250         if (NULL!=thread_data)
0251         {
0252             if (NULL!=thread_data->buf)
0253             {
0254                 NDRX_SYSBUF_FREE(thread_data->buf);
0255             }
0256             NDRX_FPFREE(thread_data);
0257         }
0258     }
0259     return ret;
0260 }
0261 
0262 
0263 /**
0264  * At this point we got message from Q, so we should forward it to network.
0265  * Q basically is XATMI sub-system
0266  * Thread processing
0267  * @param buf
0268  * @param len
0269  * @param msg_type
0270  * @return 
0271  */
0272 exprivate int br_got_message_from_q_th(void *ptr, int *p_finish_off)
0273 {
0274     int ret=EXSUCCEED;
0275     /* Get threaded data */
0276     xatmi_brmessage_t *p_xatmimsg = (xatmi_brmessage_t *)ptr;
0277     char *buf = p_xatmimsg->buf;
0278     int len = p_xatmimsg->len;
0279     char msg_type = p_xatmimsg->msg_type;
0280     
0281     BR_THREAD_ENTRY;
0282     
0283     NDRX_DUMP(log_debug, "Got message from Q:", buf, len);
0284     
0285     if (msg_type==BR_NET_CALL_MSG_TYPE_ATMI)
0286     {
0287         tp_command_generic_t *gen_command = (tp_command_generic_t *)buf;
0288         NDRX_LOG(log_debug, "Got from Q ATMI message: %d", 
0289                 gen_command->command_id);
0290         
0291         switch (gen_command->command_id)
0292         {
0293 
0294             case ATMI_COMMAND_TPCALL:
0295             case ATMI_COMMAND_CONNECT:
0296                 
0297                 NDRX_LOG(log_debug, "TPCALL/CONNECT from Q");
0298                 /* Adjust the clock */
0299                 br_clock_adj((tp_command_call_t *)buf, EXTRUE);
0300                 /* Send stuff to network, adjust clock.*/
0301                 ret=br_send_to_net(buf, len, BR_NET_CALL_MSG_TYPE_ATMI, 
0302                         gen_command->command_id);
0303                 
0304                 if (EXSUCCEED!=ret)
0305                 {
0306                     /* Generate TPNOENT */
0307                 }
0308                 
0309                 break;
0310                 
0311             /* tpreply & conversation goes via reply Q */
0312             case ATMI_COMMAND_TPREPLY:
0313             case ATMI_COMMAND_CONVDATA:
0314             case ATMI_COMMAND_CONNRPLY:
0315             case ATMI_COMMAND_DISCONN:
0316             case ATMI_COMMAND_CONNUNSOL:
0317             case ATMI_COMMAND_CONVACK:
0318             case ATMI_COMMAND_SHUTDOWN:
0319                 
0320                 NDRX_LOG(log_debug, "TPREPLY/CONVERSATION from Q");
0321                 
0322                 /* Adjust the clock */
0323                 br_clock_adj((tp_command_call_t *)buf, EXTRUE);
0324                 
0325                 ret=br_send_to_net(buf, len, BR_NET_CALL_MSG_TYPE_ATMI, 
0326                         gen_command->command_id);
0327                 
0328                 if (EXSUCCEED!=ret)
0329                 {
0330                     NDRX_LOG(log_error, "Failed to send reply to "
0331                             "net - nothing todo");
0332                     ret=EXSUCCEED;
0333                 }
0334                
0335                 break;
0336             case ATMI_COMMAND_TPFORWARD:
0337                 /* not used */
0338                 break;
0339 
0340             /* maybe move to non-threaded env...
0341              * as threads will be stopped by threadpool stop
0342              */
0343             case ATMI_COMMAND_SELF_SD:
0344                 G_shutdown_nr_got++;
0345             
0346                 NDRX_LOG(log_warn, "Got shutdown req %d of %d", 
0347                         G_shutdown_nr_got, G_shutdown_nr_wait);
0348 
0349                 break;
0350             case  ATMI_COMMAND_BROADCAST:
0351             case  ATMI_COMMAND_TPNOTIFY:
0352                 
0353                 NDRX_LOG(log_info, "Sending tpnotify/broadcast:");
0354                 
0355                 /* Translate to notification... */
0356                 ret=br_send_to_net(buf, len, BR_NET_CALL_MSG_TYPE_NOTIF, 
0357                         gen_command->command_id);
0358                 
0359                 if (EXSUCCEED!=ret)
0360                 {
0361                     NDRX_LOG(log_error, "Failed to send reply to "
0362                             "net - nothing todo");
0363                     ret=EXSUCCEED;
0364                 }       
0365                 break;
0366         }
0367     }
0368     else if (msg_type==BR_NET_CALL_MSG_TYPE_NDRXD)
0369     {
0370         command_call_t *call = (command_call_t *)buf;
0371         
0372         /* request for clock infos */
0373         if (NDRXD_COM_BRCONINFO_RQ==call->command)
0374         {
0375             NDRX_LOG(log_debug, "Clock infos request");
0376             
0377             if (EXSUCCEED!=br_coninfo(call))
0378             {
0379                 EXFAIL_OUT(ret);
0380             }
0381         }
0382         else
0383         {
0384             NDRX_LOG(log_debug, "Got from Q NDRXD message");
0385             /* I guess we can forward these directly but firstly check the type
0386              * we do not want to send any spam to other machine, do we?
0387              * Hmm but lets try out?
0388              */
0389             if (EXSUCCEED!=br_send_to_net(buf, len, msg_type, call->command))
0390             {
0391                 EXFAIL_OUT(ret);
0392             }
0393         }
0394     }
0395 out:
0396                 
0397     NDRX_SYSBUF_FREE(p_xatmimsg->buf);
0398     NDRX_FPFREE(p_xatmimsg);
0399     
0400     return ret;
0401 }
0402 /* vim: set ts=4 sw=4 et smartindent: */