Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Incoming network poller
0003  *
0004  * @file net-in.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 #include <poll.h>
0041 #include <fcntl.h>
0042 
0043 #include <ndebug.h>
0044 #include <atmi.h>
0045 #include <atmi_int.h>
0046 #include <typed_buf.h>
0047 #include <ndrstandard.h>
0048 #include <ubf.h>
0049 #include <Exfields.h>
0050 
0051 #include <exnet.h>
0052 #include <ndrxdcmn.h>
0053 
0054 #include "bridge.h"
0055 #include "../libatmisrv/srv_int.h"
0056 #include "exsha1.h"
0057 #include <ndrxdiag.h>
0058 /*---------------------------Externs------------------------------------*/
0059 /*---------------------------Macros-------------------------------------*/
0060 
0061 /** The index of the "read" end of the pipe */
0062 #define READ 0
0063 /** The index of the "write" end of the pipe */
0064 #define WRITE 1
0065 
0066 #define MAX_POLL_FD     100
0067 /*---------------------------Enums--------------------------------------*/
0068 /*---------------------------Typedefs-----------------------------------*/
0069 /*---------------------------Globals------------------------------------*/
0070 /*---------------------------Statics------------------------------------*/
0071 
0072 exprivate int M_shutdownpipe[2];          /**< When the shutdown is needed..   */
0073 exprivate int M_shutdown_req = EXFALSE;   /**< is shutdown requested?          */
0074 exprivate pthread_t M_netin_thread;       /**< thread handler                  */
0075 exprivate int M_init_ok=EXFALSE;          /**< was init ok?                    */
0076 /*---------------------------Prototypes---------------------------------*/
0077 exprivate void * br_netin_run(void *arg);
0078 
0079 
0080 /**
0081  * Receive pipe notificatino (currently only for shutdown)
0082  * @return 0
0083  */
0084 exprivate int pipenotf(int fd, uint32_t events, void *ptr1)
0085 {
0086     M_shutdown_req=EXTRUE;
0087     return EXSUCCEED;
0088 }
0089 /**
0090  * Basic setup for polling API on the separate thread 
0091  */
0092 expublic int br_netin_setup(void)
0093 {
0094     int ret = EXSUCCEED;
0095     int err;
0096     
0097     /* disable polling API... */
0098     ndrx_ext_pollsync(EXFALSE);
0099     
0100     /* O_NONBLOCK */
0101     if (EXFAIL==pipe(M_shutdownpipe))
0102     {
0103         err = errno;
0104         NDRX_LOG(log_error, "pipe failed: %s", strerror(err));
0105         EXFAIL_OUT(ret);
0106     }
0107 
0108     if (EXFAIL==fcntl(M_shutdownpipe[READ], F_SETFL, 
0109                 fcntl(M_shutdownpipe[READ], F_GETFL) | O_NONBLOCK))
0110     {
0111         err = errno;
0112         NDRX_LOG(log_error, "fcntl READ pipe set O_NONBLOCK failed: %s", 
0113                 strerror(err));
0114         EXFAIL_OUT(ret);
0115     }
0116     
0117     /* create the thread which would do the required polling? */
0118     if (EXSUCCEED!=tpext_addpollerfd(M_shutdownpipe[READ], POLLIN|POLLERR,
0119         NULL, pipenotf))
0120     {
0121         NDRX_LOG(log_error, "tpext_addpollerfd failed for pipenotf!");
0122         ret=EXFAIL;
0123         goto out;
0124     }
0125     
0126     /* create the thread */
0127     pthread_attr_t pthread_custom_attr;
0128     pthread_attr_init(&pthread_custom_attr);
0129     
0130     /* set some small stacks size, 1M should be fine! */
0131     ndrx_platf_stack_set(&pthread_custom_attr);
0132     if (EXSUCCEED!=pthread_create(&M_netin_thread, &pthread_custom_attr, 
0133             br_netin_run, NULL))
0134     {
0135         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "br_netin_setup");
0136         EXFAIL_OUT(ret);
0137     }    
0138     
0139     M_init_ok=EXTRUE;
0140 out:
0141     return ret;
0142 }
0143 
0144 /**
0145  * Ask for net-in thread to shutdown
0146  */
0147 expublic void br_netin_shutdown(void)
0148 {
0149     char b=EXTRUE;
0150     M_shutdown_req=EXTRUE;
0151     
0152     if (M_init_ok)
0153     {
0154         if (write(M_shutdownpipe[WRITE], &b, sizeof(b)) <= 0)
0155         {
0156             NDRX_LOG(log_error, "Failed to send shutdown notification: %s", 
0157                     strerror(errno));
0158         }
0159         pthread_join(M_netin_thread, NULL);
0160     }
0161     
0162 }
0163 
0164 /**
0165  * Task before going into poll
0166  */
0167 exprivate int b4poll(void)
0168 {
0169     /* stop processing if we are waiting on incoming queues... grown */
0170     br_chk_limit();
0171     
0172     return exnet_b4_poll_cb();
0173 }
0174 
0175 
0176 /**
0177  * Network thread entry
0178  * Loop over the incoming traffic and process it accordingly..
0179  * After this we can simplify the bridgesvc, so that it only performs
0180  * network-away sending....
0181  * And networking incoming may check the incoming service overflows
0182  * and that would not stop the bridge from sending away
0183  */
0184 exprivate void * br_netin_run(void *arg)
0185 {
0186     pollextension_rec_t *el;
0187     struct pollfd fds[MAX_POLL_FD]; /* if we have more, then we can crash, as really currenlty max is 2 conns */
0188     ndrx_stopwatch_t   periodic_cb;
0189     int i, j;
0190     int err, ret = EXSUCCEED;
0191     pollextension_rec_t *ext;
0192     
0193     NDRX_LOG(log_error, "br_netin_run starting...");
0194     
0195     
0196     /* Allocate network buffer */
0197     if (EXSUCCEED!=exnet_net_init(&G_bridge_cfg.net))
0198     {
0199         NDRX_LOG(log_error, "Failed to allocate data buffer!");
0200         EXFAIL_OUT(ret);
0201     }
0202     
0203     /* add custom pipe for shutdown? */
0204     
0205     if (EXSUCCEED!=tpinit(NULL))
0206     {
0207         NDRX_LOG(log_error, "Failed to tpinit() net-in thread - terminate");
0208         userlog("Failed to tpinit() net-in thread - terminate");
0209         EXFAIL_OUT(ret);
0210     }
0211     
0212     /* run simpler poller (as we have few socket only...) 
0213      * loop over the list of file descriptors and use for creating a poll
0214      * API
0215      */
0216     
0217     ndrx_stopwatch_reset(&periodic_cb);
0218     
0219     while (!M_shutdown_req)
0220     {
0221         /* before */
0222         if (EXSUCCEED!=b4poll())
0223         {
0224             NDRX_LOG(log_always, "Bridge b4poll failed - terminating!");
0225             userlog("Bridge b4poll failed - terminating!");
0226             EXFAIL_OUT(ret);
0227         }
0228 
0229         /* the list  
0230         loop over the G_pollext - get the count and allocate the buffer ... */
0231         i=0;
0232         DL_FOREACH(ndrx_G_pollext, el)
0233         {
0234             fds[i].fd = el->fd;
0235             fds[i].events = POLLIN|POLLERR;
0236             fds[i].revents=0;
0237             i++;
0238         }
0239         
0240         ret=poll(fds, i, G_bridge_cfg.check_interval*1000);
0241         
0242         if (EXFAIL==ret)
0243         {
0244             /* if timed out ... no problem.. */
0245             err=errno;
0246             NDRX_LOG(log_error, "in-in poll failed: %s - terminate", strerror(err));
0247             userlog("in-in poll failed: %s - terminate", strerror(err));
0248             EXFAIL_OUT(ret);
0249         }
0250         
0251         if (ndrx_stopwatch_get_delta_sec(&periodic_cb) >= G_bridge_cfg.check_interval)
0252         {
0253             if (EXSUCCEED!=exnet_periodic())
0254             {
0255                 NDRX_LOG(log_always, "Bridge periodic check failed - terminating!");
0256                 userlog("Bridge periodic check failed - terminating!");
0257                 EXFAIL_OUT(ret);
0258             }
0259             
0260             ndrx_stopwatch_reset(&periodic_cb);
0261         }
0262         
0263         for (j=0; j<i; j++)
0264         {
0265             if (!fds[j].revents)
0266             {
0267                 continue;
0268             }
0269             ext=ext_find_poller(fds[j].fd);
0270                 
0271             if (NULL!=ext)
0272             {
0273                 NDRX_LOG(log_info, "FD %d found in extension list, invoking", ext->fd);
0274 
0275                 ret = ext->p_pollevent(fds[j].fd, fds[j].revents, ext->ptr1);
0276                 
0277                 if (EXSUCCEED!=ret)
0278                 {
0279                     NDRX_LOG(log_error, "p_pollevent at 0x%lx failed (fd=%d)! - terminating",
0280                             ext->p_pollevent, ext->fd);
0281                     userlog("p_pollevent at 0x%lx failed (fd=%d)! - terminating",
0282                             ext->p_pollevent, ext->fd);
0283                     EXFAIL_OUT(ret);
0284                 }
0285                 else
0286                 {
0287                     continue;
0288                 }
0289             }
0290             else
0291             {
0292                 NDRX_LOG(log_error, "NULL Extension callback - terminating");
0293                 userlog("NULL Extension callback - terminating");
0294                 EXFAIL_OUT(ret);
0295             }
0296         }
0297     }
0298     
0299 out:
0300     tpterm();
0301 
0302 
0303     /* clean shutdown requested... */
0304     if (!M_shutdown_req && EXSUCCEED!=ret)
0305     {
0306         tpexit();
0307     }
0308 
0309     return NULL;
0310 }
0311 
0312 /* vim: set ts=4 sw=4 et smartindent: */