Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Bridge server
0003  *   This is special kind of EnduroX Server.
0004  *   It does configures the library to work in bridged mode.
0005  *   It parses command line by looking of following config:
0006  *   In client mode (-tA = type [A]ctive, connect to server)
0007  *   -n <Node id of server> -i <IP of the server> -p <Port of the server> -tA
0008  *   In server mode (-tP = type [P]asive, wait for call)
0009  *   -n <node id of server> -i <Bind address usually 0.0.0.0> -p <Port to bind on> -tP
0010  *   Notes for multi thread:
0011  *   - Outgoing message fully received by xatmi main thread,
0012  *   and is submitted to thread pool. The treads perform async send to network.
0013  *   - Incoming message from network also are fully received from main thread.
0014  *   Once read fully, submitted to thread pool for further processing.
0015  *   In case if thread count is set to 0, then do not use threading model, just
0016  *   just do direct calls if send & receive.
0017  *
0018  * @file bridgesvc.c
0019  */
0020 /* -----------------------------------------------------------------------------
0021  * Enduro/X Middleware Platform for Distributed Transaction Processing
0022  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0023  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0024  * This software is released under one of the following licenses:
0025  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0026  * See LICENSE file for full text.
0027  * -----------------------------------------------------------------------------
0028  * AGPL license:
0029  *
0030  * This program is free software; you can redistribute it and/or modify it under
0031  * the terms of the GNU Affero General Public License, version 3 as published
0032  * by the Free Software Foundation;
0033  *
0034  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0035  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0036  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0037  * for more details.
0038  *
0039  * You should have received a copy of the GNU Affero General Public License along 
0040  * with this program; if not, write to the Free Software Foundation, Inc.,
0041  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0042  *
0043  * -----------------------------------------------------------------------------
0044  * A commercial use license is available from Mavimax, Ltd
0045  * contact@mavimax.com
0046  * -----------------------------------------------------------------------------
0047  */
0048 #include <ndrx_config.h>
0049 #include <stdio.h>
0050 #include <stdlib.h>
0051 #include <string.h>
0052 #include <errno.h>
0053 #include <regex.h>
0054 #include <utlist.h>
0055 #include <unistd.h>    /* for getopt */
0056 #include <poll.h>
0057 
0058 #include <ndebug.h>
0059 #include <atmi.h>
0060 #include <atmi_int.h>
0061 #include <typed_buf.h>
0062 #include <ndrstandard.h>
0063 #include <ubf.h>
0064 #include <Exfields.h>
0065 
0066 #include <exnet.h>
0067 #include <ndrxdcmn.h>
0068 
0069 #include "bridge.h"
0070 #include "../libatmisrv/srv_int.h"
0071 /*---------------------------Externs------------------------------------*/
0072 /*---------------------------Macros-------------------------------------*/
0073 /*---------------------------Enums--------------------------------------*/
0074 /*---------------------------Typedefs-----------------------------------*/
0075 /*---------------------------Globals------------------------------------*/
0076 expublic bridge_cfg_t G_bridge_cfg;
0077 expublic __thread int G_thread_init = EXFALSE; /* Was thread init done?     */
0078 /*---------------------------Statics------------------------------------*/
0079 exprivate int M_init_ok = EXFALSE;
0080 /*---------------------------Prototypes---------------------------------*/
0081 
0082 /**
0083  * Send status to NDRXD
0084  * @param is_connected TRUE connected, FALSE not connected
0085  * @return SUCCEED/FAIL 
0086  */
0087 exprivate int br_send_status(int is_connected)
0088 {
0089     int ret = EXSUCCEED;
0090     
0091     bridge_info_t gencall;
0092 
0093     /* Reset gencall */
0094     memset(&gencall, 0, sizeof(gencall));
0095     gencall.nodeid = G_bridge_cfg.nodeid;
0096     
0097     NDRX_LOG(log_debug, "Reporting to ndrxd (is_connected=%d)", is_connected);
0098     
0099     /* Send command call to ndrxd */
0100     ret=cmd_generic_call((is_connected?NDRXD_COM_BRCON_RQ:NDRXD_COM_BRDISCON_RQ),
0101                     NDRXD_SRC_BRIDGE,
0102                     NDRXD_CALL_TYPE_BRIDGEINFO,
0103                     (command_call_t *)&gencall, sizeof(bridge_info_t),
0104                     ndrx_get_G_atmi_conf()->reply_q_str,
0105                     ndrx_get_G_atmi_conf()->reply_q,
0106                     (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0107                     ndrx_get_G_atmi_conf()->ndrxd_q_str,
0108                     0, NULL,
0109                     NULL,
0110                     NULL,
0111                     NULL,
0112                     EXFALSE);
0113     
0114     return ret;
0115 }
0116 /**
0117  * Connection have been established
0118  */
0119 expublic int br_connected(exnetcon_t *net)
0120 {
0121     int ret=EXSUCCEED;
0122     
0123     /* once created this shall not change... */
0124     G_bridge_cfg.con = net;
0125     NDRX_LOG(log_debug, "Net=%p", G_bridge_cfg.net);
0126     
0127     /* Send our clock to other node. */
0128     if (EXSUCCEED==br_send_clock(NDRX_BRCLOCK_MODE_ASYNC, NULL))
0129     {
0130         ret=br_send_status(EXTRUE);
0131     }
0132     return ret;
0133 }
0134 
0135 /**
0136  * Bridge is disconnected.
0137  */
0138 expublic int br_disconnected(exnetcon_t *net)
0139 {
0140     int ret=EXSUCCEED;
0141     
0142     /*
0143      * - leave object in place...
0144     G_bridge_cfg.con = NULL;
0145     */
0146     ret=br_send_status(EXFALSE);
0147     
0148     return ret;  
0149 }
0150 
0151 /**
0152  * Send zero length message to socket, to keep some activity
0153  * Processed by sender worker thread
0154  * @param ptr ptr to network structure
0155  * @param p_finish_off not used
0156  * @return EXSUCCEED
0157  */
0158 exprivate int br_snd_zero_len_th(void *ptr, int *p_finish_off)
0159 {
0160     exnetcon_t *net = (exnetcon_t *)ptr;
0161     
0162     /* Lock to network */
0163     exnet_rwlock_read(net);
0164 
0165     if (exnet_is_connected(net))
0166     {
0167         if (EXSUCCEED!=exnet_send_sync(net, NULL, 0, NULL, 0, 0, 0))
0168         {
0169             NDRX_LOG(log_debug, "Failed to send zero length message!");
0170         }
0171     }
0172 
0173     /* unlock the network */
0174     exnet_rwlock_unlock(net); 
0175     
0176     return EXSUCCEED;
0177 }
0178 
0179 /**
0180  * Processed by main thread / dispatch to worker pool
0181  * @return EXUSCCEED
0182  */
0183 exprivate int br_snd_zero_len(exnetcon_t *net)
0184 {
0185     /* no need to send to network if already has to-net jobs... 
0186      * thus avoid any blocking conditions on outgoing...
0187      */
0188     ndrx_thpool_add_work2(G_bridge_cfg.thpool_tonet, (void *)br_snd_zero_len_th, 
0189             (void *)net, NDRX_THPOOL_ONEJOB, 0);
0190     return EXSUCCEED;
0191 }
0192 
0193 /**
0194  * Send periodic clocks
0195  * @return EXUSCCEED
0196  */
0197 exprivate int br_snd_clock_sync(exnetcon_t *net)
0198 {
0199     br_send_clock(NDRX_BRCLOCK_MODE_REQ, NULL);
0200     return EXSUCCEED;
0201 }
0202 
0203 /**
0204  * Report status to ndrxd by callback, so that when ndrxd is being restarted
0205  * we get back correct bridge state.
0206  * @return SUCCEED/FAIL
0207  */
0208 expublic int br_report_to_ndrxd_cb(void)
0209 {
0210     int ret = EXSUCCEED;
0211     
0212     NDRX_LOG(log_warn, "br_report_to_ndrxd_cb: Reporting to ndrxd bridge status: %s", 
0213             (G_bridge_cfg.con?"Connected":"Disconnected"));
0214     if (G_bridge_cfg.con)
0215     {
0216         ret=br_send_status(EXTRUE);
0217     }
0218     else
0219     {
0220         ret=br_send_status(EXFALSE);
0221     }
0222     
0223     return ret;
0224 }
0225 
0226 /**
0227  * Bridge service entry
0228  * @param p_svc - data & len used only...!
0229  */
0230 void TPBRIDGE (TPSVCINFO *p_svc)
0231 {
0232     /* Dummy service, calls will never reach this point. */
0233 }
0234 
0235 /**
0236  * Terminate the thread...
0237  */
0238 void ndrx_thpool_thread_done(void)
0239 {
0240     tpterm();
0241 }
0242 
0243 /**
0244  * This is thread init
0245  * @param argc
0246  * @param argv
0247  * @return EXSUCCEED/EXFAIL
0248  */
0249 int ndrx_thpool_thread_init(int argc, char **argv)
0250 {
0251     int ret = EXSUCCEED;
0252     
0253     if (EXSUCCEED!=tpinit(NULL))
0254     {
0255         NDRX_LOG(log_error, "Failed to tpinit for thread...");
0256         EXFAIL_OUT(ret);
0257     }
0258     
0259 out:
0260     return ret;
0261 }
0262 
0263 /**
0264  * Do initialization
0265  * For bridge we could make a special rq address, for example "@TPBRIDGENNN"
0266  * we will an API for ndrx_reqaddrset("...") which would configure the libnstd
0267  * properly.
0268  */
0269 int NDRX_INTEGRA(tpsvrinit)(int argc, char **argv)
0270 {
0271     int ret=EXSUCCEED;
0272     int c;
0273     char *p;
0274     int flags = SRV_KEY_FLAGS_BRIDGE; /* This is bridge */
0275     int thpoolcfg = 0;
0276     int qaction=EXFAIL;
0277     NDRX_LOG(log_debug, "tpsvrinit called");
0278     
0279     /* Reset network structs */
0280     exnet_reset_struct(&G_bridge_cfg.net);
0281     
0282     G_bridge_cfg.nodeid = EXFAIL;
0283     G_bridge_cfg.qsize = EXFAIL;
0284     G_bridge_cfg.qsizesvc = EXFAIL;
0285     G_bridge_cfg.qttl= EXFAIL;
0286     G_bridge_cfg.qmaxsleep= EXFAIL;
0287     G_bridge_cfg.qminsleep= EXFAIL;
0288     G_bridge_cfg.timediff = 0;
0289     G_bridge_cfg.threadpoolsize = BR_DEFAULT_THPOOL_SIZE; /* will be reset to default */
0290     G_bridge_cfg.qretries = BR_QRETRIES_DEFAULT;
0291     G_bridge_cfg.check_interval=EXFAIL;
0292     G_bridge_cfg.threadpoolbufsz=EXFAIL;
0293     G_bridge_cfg.qfullaction = EXFAIL;
0294     G_bridge_cfg.qfullactionsvc = EXFAIL;
0295     G_bridge_cfg.net.periodic_clock_time = BR_PERIODIC_CLOCK_SND; /* Send clock sync periodically */
0296     /* Bug #689 */
0297     G_bridge_cfg.max_roundtrip = BR_MAX_ROUNDTRIP;
0298 
0299     /* init the spinlock... */
0300     NDRX_SPIN_INIT_V(G_bridge_cfg.timediff_lock);
0301 
0302     /* Parse command line  */
0303     while ((c = getopt(argc, argv, "frn:i:p:t:T:z:c:g:s:P:R:a:6h:Q:q:L:M:B:m:A:k:K:")) != -1)
0304     {
0305         /* NDRX_LOG(log_debug, "%c = [%s]", c, optarg); - on solaris gets cores? */
0306         switch(c)
0307         {
0308             case 'k':
0309                 G_bridge_cfg.max_roundtrip = atol(optarg);
0310                 break;
0311             case 'K':
0312                 G_bridge_cfg.net.periodic_clock_time = atoi(optarg);
0313                 break;
0314             case '6':
0315                 NDRX_LOG(log_debug, "Using IPv6 addresses");
0316                 G_bridge_cfg.net.is_ipv6=EXTRUE;
0317                 break;
0318             case 'r':
0319                 NDRX_LOG(log_debug, "Will send refersh to node.");
0320                 /* Will send refersh */
0321                 flags|=SRV_KEY_FLAGS_SENDREFERSH;
0322                 break;
0323             case 'n':
0324                 G_bridge_cfg.nodeid=(short)atoi(optarg);
0325                 NDRX_LOG(log_debug, "Node ID, -n = [%hd]", G_bridge_cfg.nodeid);
0326                 break;
0327             case 'Q':
0328                 G_bridge_cfg.qsize=atoi(optarg);
0329                 NDRX_LOG(log_debug, "Temporary queue size, -Q = [%d]", G_bridge_cfg.qsize);
0330                 break;
0331             case 'q':
0332                 G_bridge_cfg.qsizesvc=atoi(optarg);
0333                 NDRX_LOG(log_debug, "Temporary service queue size, -q = [%d]", 
0334                         G_bridge_cfg.qsizesvc);
0335                 break;
0336             case 'A':
0337                 qaction=(int)atoi(optarg);
0338                 NDRX_LOG(log_debug, "Temp queue action, -A = [%d]", 
0339                         qaction);
0340                 break;
0341             case 'L':
0342                 G_bridge_cfg.qttl=atoi(optarg);
0343                 NDRX_LOG(log_debug, "Temporary queue TTL, -L = [%d] ms", G_bridge_cfg.qttl);
0344                 break;
0345             case 'M':
0346                 G_bridge_cfg.qmaxsleep=atoi(optarg);
0347                 NDRX_LOG(log_debug, "Temporary queue Max Sleep, -M = [%d] ms", 
0348                         G_bridge_cfg.qmaxsleep);
0349                 break;
0350                 
0351             case 'm':
0352                 G_bridge_cfg.qminsleep=atoi(optarg);
0353                 NDRX_LOG(log_debug, "Temporary queue Min Sleep, -m = [%d] ms", 
0354                         G_bridge_cfg.qmaxsleep);
0355                 break;
0356             
0357             case 'B':
0358                 G_bridge_cfg.threadpoolbufsz=atoi(optarg);
0359                 NDRX_LOG(log_debug, "Thread pool buffer size, -B = [%d]", 
0360                         G_bridge_cfg.threadpoolbufsz);
0361                 break;
0362             case 'i':
0363                 if (EXEOS!=G_bridge_cfg.net.addr[0])
0364                 {
0365                     NDRX_LOG(log_error, "ERROR! Connection address already set "
0366                             "to: [%s] cannot process -i", G_bridge_cfg.net.addr);
0367                     EXFAIL_OUT(ret);
0368                 }
0369                 
0370                 NDRX_STRCPY_SAFE(G_bridge_cfg.net.addr, optarg);
0371                 NDRX_LOG(log_debug, "IP server/binding address, -i = [%s]", 
0372                         G_bridge_cfg.net.addr);
0373                 G_bridge_cfg.net.is_numeric = EXTRUE;
0374                 break;
0375                 
0376             case 'h':
0377                 
0378                 if (EXEOS!=G_bridge_cfg.net.addr[0])
0379                 {
0380                     NDRX_LOG(log_error, "ERROR! Connection address already set "
0381                             "to: [%s] connect process -h", G_bridge_cfg.net.addr);
0382                     EXFAIL_OUT(ret);
0383                 }
0384                 
0385                 NDRX_STRCPY_SAFE(G_bridge_cfg.net.addr, optarg);
0386                 NDRX_LOG(log_debug, "DNS host name, -h = [%s]", 
0387                         G_bridge_cfg.net.addr);
0388                 G_bridge_cfg.net.is_numeric = EXFALSE;
0389                 break;
0390             case 'p':
0391                 NDRX_STRCPY_SAFE(G_bridge_cfg.net.port, optarg);
0392         /* port will be promoted to integer... */
0393                 NDRX_LOG(log_debug, "Port no, -p = [%s]", G_bridge_cfg.net.port);
0394                 break;
0395             case 'T':
0396                 G_bridge_cfg.net.rcvtimeout = atoi(optarg);
0397                 NDRX_LOG(log_debug, "Receive time-out (-T): %d", 
0398                         G_bridge_cfg.net.rcvtimeout);
0399                 break;
0400             case 'b':
0401                 G_bridge_cfg.net.backlog = atoi(optarg);
0402                 NDRX_LOG(log_debug, "Backlog (-b): %d", 
0403                         G_bridge_cfg.net.backlog);
0404                 break;
0405             case 'c':
0406                 G_bridge_cfg.check_interval = atoi(optarg);
0407                 NDRX_LOG(log_debug, "check (-c): %d", 
0408                         G_bridge_cfg.check_interval);
0409                 break;
0410             case 'z':
0411                 G_bridge_cfg.net.periodic_zero = atoi(optarg);
0412                 NDRX_LOG(log_debug, "periodic_zero (-z): %d", 
0413                                 G_bridge_cfg.net.periodic_zero);
0414                 break;
0415             case 'a':
0416                 G_bridge_cfg.net.recv_activity_timeout = atoi(optarg);
0417                 NDRX_LOG(log_debug, "recv_activity_timeout (-a): %d", 
0418                                 G_bridge_cfg.net.recv_activity_timeout);
0419                 break;
0420             case 'f':
0421                 G_bridge_cfg.common_format = EXTRUE;
0422                 NDRX_LOG(log_debug, "Using common network protocol.");
0423                 break;
0424             case 'g':
0425                 NDRX_LOG(log_warn, "-g not supported any more");
0426                 break;
0427             case 's':
0428                 NDRX_LOG(log_warn, "-s not supported any more");
0429                 break;
0430             case 'P': 
0431                 /* half is used for download, and other half for upload */
0432                 thpoolcfg = atol(optarg);
0433                 G_bridge_cfg.threadpoolsize = thpoolcfg / 2;
0434                 break;
0435             case 'R': 
0436                 G_bridge_cfg.qretries = atoi(optarg);
0437                 break;
0438             case 't': 
0439                 
0440                 if ('P'==*optarg)
0441                 {
0442                     NDRX_LOG(log_debug, "Server mode enabled - "
0443                             "will wait for call");
0444                     G_bridge_cfg.net.is_server = EXTRUE;
0445                 }
0446                 else
0447                 {
0448                     NDRX_LOG(log_debug, "Client mode enabled - "
0449                             "will connect to server");
0450                     G_bridge_cfg.net.is_server = EXFALSE;
0451                 }
0452                 break;
0453             default:
0454                 /*return FAIL;*/
0455                 break;
0456         }
0457     }
0458     
0459     if (G_bridge_cfg.qsize <= 0 && NULL!=(p=getenv(CONF_NDRX_MSGMAX)))
0460     {
0461         NDRX_LOG(log_debug, "Reading queue size from [%s]", CONF_NDRX_MSGMAX);
0462         G_bridge_cfg.qsize = atoi(p);
0463     }
0464     
0465     if (G_bridge_cfg.qsize <= 0)
0466     {
0467         NDRX_LOG(log_debug, "Defaulting queue size");
0468         G_bridge_cfg.qsize = DEFAULT_QUEUE_SIZE;
0469     }
0470     
0471     if (G_bridge_cfg.qsizesvc <= 0)
0472     {
0473         NDRX_LOG(log_debug, "Defaulting service queue size");
0474         G_bridge_cfg.qsizesvc = G_bridge_cfg.qsize / 2;
0475         
0476         if ( G_bridge_cfg.qsizesvc < 1)
0477         {
0478             G_bridge_cfg.qsizesvc=1;
0479         }
0480     }
0481     
0482     /* there is no sense to have service queue bigger than global limit */
0483     if (G_bridge_cfg.qsize < G_bridge_cfg.qsizesvc)
0484     {
0485         NDRX_LOG(log_error, "Error: Global temp queue size shorter than "
0486                 "service queue size: -Q (%d) <  -q (%d)", 
0487                 G_bridge_cfg.qsize, G_bridge_cfg.qsizesvc);
0488         EXFAIL_OUT(ret);
0489     }
0490     
0491     if (G_bridge_cfg.qmaxsleep <= 0)
0492     {
0493         NDRX_LOG(log_debug, "Defaulting queue max sleep");
0494         G_bridge_cfg.qmaxsleep = DEFAULT_QUEUE_MAXSLEEP;
0495     }
0496     
0497     if (G_bridge_cfg.qminsleep <= 0)
0498     {
0499         NDRX_LOG(log_debug, "Defaulting queue min sleep");
0500         G_bridge_cfg.qminsleep = DEFAULT_QUEUE_MINSLEEP;
0501     }
0502     
0503     if (G_bridge_cfg.qttl < 0)
0504     {
0505         NDRX_LOG(log_debug, "Setting TTL to NDRX_TOUT value");
0506         /* convert from seconds to ms */
0507         G_bridge_cfg.qttl = tptoutget()*1000;
0508     }
0509     
0510     if (G_bridge_cfg.threadpoolsize < 1)
0511     {
0512         NDRX_LOG(log_warn, "Thread pool size (-P) have invalid value "
0513                 "(%d) defaulting to %d", 
0514                 thpoolcfg, BR_DEFAULT_THPOOL_SIZE*2);
0515         G_bridge_cfg.threadpoolsize = BR_DEFAULT_THPOOL_SIZE;
0516     }
0517     
0518     if (G_bridge_cfg.threadpoolbufsz < 0)
0519     {
0520         G_bridge_cfg.threadpoolbufsz = G_bridge_cfg.threadpoolsize/2;
0521     }
0522     
0523     if (G_bridge_cfg.check_interval < 1)
0524     {
0525         G_bridge_cfg.check_interval=5;
0526     }
0527     
0528     /* configure action */
0529     switch (qaction)
0530     {
0531         case EXFAIL:
0532         case QUEUE_FLAG_ACTION_BLKIGN:
0533             G_bridge_cfg.qfullaction = QUEUE_ACTION_BLOCK;
0534             G_bridge_cfg.qfullactionsvc = QUEUE_ACTION_IGNORE;
0535             NDRX_LOG(log_warn, "Queue action: temp full - block, svc full - ignore");
0536             break;
0537         case QUEUE_FLAG_ACTION_BLKDROP:
0538             G_bridge_cfg.qfullaction = QUEUE_ACTION_BLOCK;
0539             G_bridge_cfg.qfullactionsvc = QUEUE_ACTION_DROP;
0540             NDRX_LOG(log_warn, "Queue action: temp full - block, svc full - drop");
0541             break;
0542         case QUEUE_FLAG_ACTION_DROPDROP:
0543             G_bridge_cfg.qfullaction = QUEUE_ACTION_DROP;
0544             G_bridge_cfg.qfullactionsvc = QUEUE_ACTION_DROP;
0545             NDRX_LOG(log_warn, "Queue action: temp full - drop, svc full - drop");
0546             break;
0547         default:
0548             
0549             NDRX_LOG(log_error, "Invalid -A value: %d, supported: %d %d %d default: %d",
0550                     QUEUE_FLAG_ACTION_BLKIGN, QUEUE_FLAG_ACTION_BLKDROP, 
0551                     QUEUE_FLAG_ACTION_DROPDROP,
0552                     QUEUE_FLAG_ACTION_DROPDROP);
0553             EXFAIL_OUT(ret);
0554             
0555             break;
0556     }
0557     
0558     NDRX_LOG(log_warn, "Temporary queue size set to: %d", G_bridge_cfg.qsize);
0559     NDRX_LOG(log_warn, "Temporary service queue size set to: %d", G_bridge_cfg.qsizesvc);
0560     NDRX_LOG(log_warn, "Temporary queue ttl set to: %d", G_bridge_cfg.qttl);
0561     NDRX_LOG(log_warn, "Temporary queue max sleep set to: %d", G_bridge_cfg.qmaxsleep);
0562     NDRX_LOG(log_warn, "Temporary queue min sleep set to: %d", G_bridge_cfg.qminsleep);
0563     NDRX_LOG(log_warn, "Threadpool job queue size: %d", G_bridge_cfg.threadpoolbufsz);
0564     NDRX_LOG(log_warn, "Check interval is: %d seconds", G_bridge_cfg.check_interval);
0565     
0566     if (0>G_bridge_cfg.net.recv_activity_timeout)
0567     {
0568         G_bridge_cfg.net.recv_activity_timeout = G_bridge_cfg.net.periodic_zero*2;
0569     }
0570     
0571     NDRX_LOG(log_warn, "Threadpool size set to: from-net=%d to-net=%d (cfg=%d)",
0572             G_bridge_cfg.threadpoolsize, G_bridge_cfg.threadpoolsize, thpoolcfg);
0573     
0574     NDRX_LOG(log_warn, "Periodic zero: %d sec, reset on no received: %d sec",
0575             G_bridge_cfg.net.periodic_zero, G_bridge_cfg.net.recv_activity_timeout);
0576     
0577     /* Check configuration */
0578     if (EXFAIL==G_bridge_cfg.nodeid)
0579     {
0580         NDRX_LOG(log_error, "Flag -n not set!");
0581         EXFAIL_OUT(ret);
0582     }
0583     
0584     if (EXEOS==G_bridge_cfg.net.addr[0])
0585     {
0586         NDRX_LOG(log_error, "Flag -i not set!");
0587         EXFAIL_OUT(ret);
0588     }
0589     
0590     if (EXEOS==G_bridge_cfg.net.port[0])
0591     {
0592         NDRX_LOG(log_error, "Flag -p not set!");
0593         EXFAIL_OUT(ret);
0594     }
0595     
0596     if (EXFAIL==G_bridge_cfg.net.is_server)
0597     {
0598         NDRX_LOG(log_error, "Flag -T not set!");
0599         EXFAIL_OUT(ret);
0600     }
0601     
0602     br_tempq_init();
0603     
0604     /* Install call-backs */
0605     exnet_install_cb(&G_bridge_cfg.net, br_process_msg, br_connected, 
0606             br_disconnected, br_snd_zero_len, br_snd_clock_sync);
0607     
0608     ndrx_set_report_to_ndrxd_cb(br_report_to_ndrxd_cb);
0609     
0610     /* Then configure the lib - we will have only one client session! */
0611     G_bridge_cfg.net.len_pfx=NET_LEN_PFX_LEN;
0612     G_bridge_cfg.net.max_cons=1;
0613     
0614     if (EXSUCCEED!=exnet_configure(&G_bridge_cfg.net))
0615     {
0616         NDRX_LOG(log_error, "Failed to configure network lib!");
0617         EXFAIL_OUT(ret);
0618     }
0619     
0620     /* Set server flags  */
0621     tpext_configbrige(G_bridge_cfg.nodeid, flags, br_got_message_from_q);
0622     
0623     snprintf(G_bridge_cfg.svc, sizeof(G_bridge_cfg.svc), NDRX_SVC_BRIDGE, 
0624             G_bridge_cfg.nodeid);
0625     
0626     if (EXSUCCEED!=tpadvertise(G_bridge_cfg.svc, TPBRIDGE))
0627     {
0628         NDRX_LOG(log_error, "Failed to advertise %s service!", G_bridge_cfg.svc);
0629         EXFAIL_OUT(ret);
0630     }
0631     
0632     if (NULL==(G_bridge_cfg.thpool_tonet = ndrx_thpool_init(G_bridge_cfg.threadpoolsize, 
0633             &ret, ndrx_thpool_thread_init, ndrx_thpool_thread_done, 0, NULL)) || EXSUCCEED!=ret)
0634     {
0635         NDRX_LOG(log_error, "Failed to initialize to-net thread pool (cnt: %d)!", 
0636                 G_bridge_cfg.threadpoolsize);
0637         EXFAIL_OUT(ret);
0638     }
0639     
0640     if (NULL==(G_bridge_cfg.thpool_queue = ndrx_thpool_init(1, 
0641             &ret, ndrx_thpool_thread_init, ndrx_thpool_thread_done, 0, NULL)) || EXSUCCEED!=ret)
0642     {
0643         NDRX_LOG(log_error, "Failed to initialize queue-runner thread pool (cnt: %d)!", 
0644                 1);
0645         EXFAIL_OUT(ret);
0646     }
0647     
0648     if (NULL==(G_bridge_cfg.thpool_fromnet = ndrx_thpool_init(G_bridge_cfg.threadpoolsize, 
0649             &ret, ndrx_thpool_thread_init, ndrx_thpool_thread_done, 0, NULL)) || EXSUCCEED!=ret)
0650     {
0651         NDRX_LOG(log_error, "Failed to initialize from-net thread pool (cnt: %d)!",
0652                 G_bridge_cfg.threadpoolsize);
0653         EXFAIL_OUT(ret);
0654     }
0655     
0656     if (EXSUCCEED!=br_netin_setup())
0657     {
0658         NDRX_LOG(log_error, "Failed to init network management (net-in) thread");
0659         EXFAIL_OUT(ret);
0660     }
0661     
0662     NDRX_LOG(log_info, "Queue re-submit retries set to: %d", G_bridge_cfg.qretries);
0663     
0664     M_init_ok = EXTRUE;
0665     
0666 out:
0667     return ret;
0668 }
0669 
0670 /**
0671  * Shutdown the thread
0672  * @param arg
0673  * @param p_finish_off
0674  */
0675 expublic void tp_thread_shutdown(void *ptr, int *p_finish_off)
0676 {
0677     NDRX_LOG(log_info, "tp_thread_shutdown - enter");
0678     tpterm();
0679     *p_finish_off = EXTRUE;
0680     NDRX_LOG(log_info, "tp_thread_shutdown - ok");
0681 }
0682 
0683 /**
0684  * Do de-initialization
0685  */
0686 void NDRX_INTEGRA(tpsvrdone)(void)
0687 {
0688     NDRX_LOG(log_debug, "tpsvrdone called");
0689     
0690     /* shutdown network runner... */
0691     br_netin_shutdown();
0692     
0693     /* Bug #170
0694      * Shutdown threads and only then close connection
0695      * Otherwise we remove connection and threads are generating core at shutdown.
0696      * as network object is gone..
0697      */
0698     if (M_init_ok)
0699     {   
0700         /* Wait for threads to finish */
0701         ndrx_thpool_wait(G_bridge_cfg.thpool_tonet);
0702         ndrx_thpool_destroy(G_bridge_cfg.thpool_tonet);
0703         
0704         ndrx_thpool_wait(G_bridge_cfg.thpool_fromnet);
0705         ndrx_thpool_destroy(G_bridge_cfg.thpool_fromnet);
0706         
0707         ndrx_thpool_wait(G_bridge_cfg.thpool_queue);
0708         ndrx_thpool_destroy(G_bridge_cfg.thpool_queue);
0709     }
0710     
0711     /* close if not server connection...  */
0712     if (NULL!=G_bridge_cfg.con && (&G_bridge_cfg.net)!=G_bridge_cfg.con)   
0713     {
0714         /* we do not have a locks... */
0715         exnet_rwlock_read(G_bridge_cfg.con);
0716         exnet_close_shut(G_bridge_cfg.con);
0717     }
0718     
0719     /* If we were server, then close server socket too */
0720     if (G_bridge_cfg.net.is_server)
0721     {
0722         /* we do not have a locks... */
0723         exnet_rwlock_read(&G_bridge_cfg.net);
0724         exnet_close_shut(&G_bridge_cfg.net);
0725     }
0726     
0727     /* erase addresses... */
0728     exnet_unconfigure(&G_bridge_cfg.net);
0729     
0730     /* terminate spinlock.. */
0731     NDRX_SPIN_DESTROY_V(G_bridge_cfg.timediff_lock);
0732 }
0733 
0734 /* vim: set ts=4 sw=4 et smartindent: */