Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Dynamic advertise & unadvertise routines
0003  *
0004  * @file dynadv.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 #include <ndrx_config.h>
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <errno.h>
0040 #include <sys/stat.h>
0041 #include <unistd.h>
0042 #include <fcntl.h>
0043 
0044 #include <ndrstandard.h>
0045 #include <ndebug.h>
0046 #include <utlist.h>
0047 #include <string.h>
0048 #include "srv_int.h"
0049 #include "tperror.h"
0050 #include <atmi_int.h>
0051 #include <atmi_shm.h>
0052 #include <sys_unix.h>
0053 #include <ndrx_ddr.h>
0054 /*---------------------------Externs------------------------------------*/
0055 /*---------------------------Macros-------------------------------------*/
0056 #define READVERTISE_SLEEP_SRV       2   /* Sleep X sec for re-advertise */
0057 /*---------------------------Enums--------------------------------------*/
0058 /*---------------------------Typedefs-----------------------------------*/
0059 /*---------------------------Globals------------------------------------*/
0060 /*---------------------------Statics------------------------------------*/
0061 /*---------------------------Prototypes---------------------------------*/
0062 
0063 /**
0064  * 1. Take a copy of service,
0065  * 2. un-advertise
0066  * 3. advertise + save time when q is open
0067  * 
0068  * This should be used in case when ndrxd removed the queue, un in same time
0069  * we did advertise!
0070  * 
0071  * @param svcname service name to readvertise 
0072  * @return EXSUCCEED/EXFAIL
0073  */
0074 expublic int dynamic_readvertise(char *svcname)
0075 {
0076     int ret=EXSUCCEED;
0077     svc_entry_fn_t *entry=NULL;
0078     char *fn="dynamic_readvertise";
0079     int found = EXFALSE;
0080     int mod;
0081     
0082     NDRX_LOG(log_warn, "%s: enter, svcname = [%s]", fn, svcname);
0083     
0084     if ( (entry = (svc_entry_fn_t*)NDRX_MALLOC(sizeof(svc_entry_fn_t))) == NULL)
0085     {
0086             NDRX_LOG(log_error, "Failed to allocate %d bytes while parsing -s",
0087                                 sizeof(svc_entry_fn_t));
0088             ret=EXFAIL;
0089             goto out;
0090     }
0091     
0092     memset(entry, 0, sizeof(*entry));
0093     
0094     /* Un-advertise it firstly */
0095     if (EXSUCCEED!=dynamic_unadvertise(svcname, &found, entry) || !found)
0096     {
0097         NDRX_LOG(log_error, "Failed to unadvertise: [%s]", svcname);
0098         ret=EXFAIL;
0099         goto out;
0100     }
0101     
0102     /* So that we do not get infinite loop, in case 
0103      * if many servers does restarts at the same time! 
0104      */
0105     mod = ndrx_rand() % 4;
0106     /* Have some sleep, as limit */
0107     NDRX_LOG(log_warn, "Sleeping %d seconds for re-advertise!", 
0108             READVERTISE_SLEEP_SRV+mod);
0109     
0110     sleep(READVERTISE_SLEEP_SRV+mod);
0111     
0112     /* Now advertise back on! */
0113     entry->q_descr = (mqd_t)EXFAIL;
0114     if (EXSUCCEED!=dynamic_advertise(entry, svcname, entry->p_func, entry->fn_nm))
0115     {
0116         NDRX_LOG(log_error, "Failed to advertise: [%s]", svcname);
0117         ret=EXFAIL;
0118         goto out;
0119     }
0120     
0121 out:
0122 
0123     if (EXSUCCEED!=ret && NULL!=entry)
0124         NDRX_FREE(entry);
0125 
0126     NDRX_LOG(log_warn, "%s: return, ret = %d", fn, ret);
0127     return ret;
0128 }
0129 
0130 /**
0131  * Dynamic unadvertise
0132  * @param svcname
0133  * @return EXSUCCEED/EXFAIL
0134  */
0135 expublic int dynamic_unadvertise(char *svcname, int *found, svc_entry_fn_t *copy)
0136 {
0137     int ret=EXSUCCEED;
0138     int pos;
0139     svc_entry_fn_t *ent=NULL;
0140     int service;
0141     int len;
0142     
0143     char *thisfn="_dynamic_unadvertise";
0144     
0145     for (pos=0; pos<G_server_conf.adv_service_count; pos++)
0146     {
0147         if (0==strcmp(svcname, G_server_conf.service_array[pos]->svc_nm))
0148         {
0149             ent = G_server_conf.service_array[pos];
0150             NDRX_LOG(log_warn, "Service [%s] found in array at %d", svcname, pos);
0151             break;
0152         }
0153     }
0154     
0155     if (ent)
0156     {
0157         /*Return some stuff back there usable by dynamic_readvertise*/
0158         if (NULL!=copy)
0159         {
0160             memcpy(copy, ent, sizeof(svc_entry_fn_t));
0161         }
0162         
0163         if (NULL!=found)
0164         {
0165             *found = EXTRUE;
0166         }
0167         
0168         NDRX_LOG(log_error, "Q File descriptor: %d - removing from polling struct", 
0169                 ent->q_descr);
0170         
0171         if (EXFAIL==ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_DEL,
0172                             ent->q_descr, NULL))
0173         {
0174             ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_ctl failed to remove fd %d from epollfd: %s", 
0175                     ent->q_descr, ndrx_poll_strerror(ndrx_epoll_errno()));
0176             ret=EXFAIL;
0177             goto out;
0178         }
0179         
0180         /* Now close the FD, only if was open */
0181         if (ndrx_epoll_shallopenq(pos) &&
0182                 EXSUCCEED!=ndrx_mq_close(ent->q_descr))
0183         {
0184             ndrx_TPset_error_fmt(TPEOS, "ndrx_mq_close failed to close fd %d: %s", 
0185                     ent->q_descr, strerror(errno));
0186             ret=EXFAIL;
0187             goto out;
0188         }
0189         
0190         len = G_server_conf.adv_service_count;
0191         
0192         if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_server_conf.service_array), pos, 
0193                     len, sizeof(svc_entry_fn_t *)))
0194         {
0195             NDRX_LOG(log_error, "Failed to shift memory for "
0196                                 "G_server_conf.service_array!");
0197             ret=EXFAIL;
0198             goto out;
0199         }
0200         /* Now reduce the memory usage */
0201         G_server_conf.service_array=NDRX_REALLOC(G_server_conf.service_array, 
0202                                 (sizeof(svc_entry_fn_t *)*len-1));
0203 
0204         if (NULL==G_server_conf.service_array)
0205         {
0206             ndrx_TPset_error_fmt(TPEOS, "realloc failed: %s", strerror(errno));
0207             ret=EXFAIL;
0208             goto out;
0209         }
0210         
0211         /* Free up the memory used!?!  */
0212         NDRX_FREE(ent);
0213         ent=NULL;
0214 
0215         service = pos - ATMI_SRV_Q_ADJUST;
0216         if (EXSUCCEED!=atmisrv_array_remove_element((void *)G_shm_srv->svc_fail, service, 
0217                     MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_fail))))
0218         {
0219             NDRX_LOG(log_error, "Failed to shift memory for G_shm_srv->svc_succeed!");
0220             ret=EXFAIL;
0221             goto out;
0222         }
0223         
0224         if (EXSUCCEED!=unadvertse_to_ndrxd(svcname))
0225         {
0226             ret=EXFAIL;
0227             goto out;
0228         }
0229         
0230         G_server_conf.adv_service_count--;
0231 
0232         if (G_shm_srv)
0233         {
0234             /* Shift shared memory, adjust the stuff by: ATMI_SRV_Q_ADJUST*/
0235             if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->svc_succeed), service, 
0236                         MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_succeed))))
0237             {
0238                 NDRX_LOG(log_error, "Failed to shift memory for G_shm_srv->svc_succeed!");
0239                 ret=EXFAIL;
0240                 goto out;
0241             }
0242 
0243             if (EXSUCCEED!=atmisrv_array_remove_element((void *)&(G_shm_srv->min_rsp_msec), 
0244                      service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->min_rsp_msec))))
0245             {
0246                 NDRX_LOG(log_error, "Failed to shift memory for "
0247                                                 "G_shm_srv->min_rsp_msec!");
0248                 ret=EXFAIL;
0249                 goto out;
0250             }
0251 
0252             if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->max_rsp_msec), 
0253                         service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->max_rsp_msec))))
0254             {
0255                 NDRX_LOG(log_error, "Failed to shift memory for "
0256                                                 "G_shm_srv->max_rsp_msec!");
0257                 ret=EXFAIL;
0258                 goto out;
0259             }
0260 
0261             if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->last_rsp_msec), 
0262                         service,  MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->last_rsp_msec))))
0263             {
0264                 NDRX_LOG(log_error, "Failed to shift memory for 1"
0265                                                 "G_shm_srv->last_rsp_msec!");
0266                 ret=EXFAIL;
0267                 goto out;
0268             }
0269 
0270             if (EXSUCCEED!=atmisrv_array_remove_element((void *)&(G_shm_srv->svc_status), 
0271                         service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_status))))
0272             {
0273                 NDRX_LOG(log_error, "Failed to shift memory for "
0274                                                 "G_shm_srv->svc_status!");
0275                 ret=EXFAIL;
0276                 goto out;
0277             }
0278         }
0279     }
0280     else 
0281     {
0282         ndrx_TPset_error_fmt(TPENOENT, "%s: service [%s] not advertised", thisfn, svcname);
0283         ret=EXFAIL;
0284         goto out;
0285     }
0286     
0287 out:
0288     return ret;
0289 }
0290 
0291 /**
0292  * We are going to dynamically advertise the service
0293  * @param svcname
0294  * @return EXSUCCEED/EXFAIL
0295  */
0296 expublic int dynamic_advertise(svc_entry_fn_t *entry_new, 
0297                     char *svc_nm, void (*p_func)(TPSVCINFO *), char *fn_nm)
0298 {
0299     int ret=EXSUCCEED;
0300     int pos, service;
0301     svc_entry_fn_t *entry_chk=NULL;
0302     struct ndrx_epoll_event ev;
0303     int sz;
0304     /* lookup dynamically ... OK ? */
0305     int autotran=0;
0306     unsigned long trantime=NDRX_DDR_TRANTIMEDFLT;
0307 
0308     for (pos=0; pos<G_server_conf.adv_service_count; pos++)
0309     {
0310         if (0==strcmp(svc_nm, G_server_conf.service_array[pos]->svc_nm))
0311         {
0312             entry_chk = G_server_conf.service_array[pos];
0313             break;
0314         }
0315     }
0316     
0317     /* Check for advertise existance */
0318     if (NULL!=entry_chk)
0319     {
0320         NDRX_LOG(log_info, "Service [%s] found in array at %d", 
0321                                 svc_nm, pos);
0322         
0323         if (entry_chk->p_func == p_func)
0324         {
0325             NDRX_LOG(log_info, "Advertised function ptr "
0326                                 "the same - return OK!");
0327             goto out;
0328         }
0329         else
0330         {
0331             ndrx_TPset_error_fmt(TPEMATCH, "Service [%s] already advertised by func. "
0332                     "ptr. %p, but now requesting advertise by func. ptr. %p!",
0333                     svc_nm, entry_chk->p_func, p_func);
0334             ret=EXFAIL;
0335             goto out;
0336         }
0337     }
0338     
0339     /* Check the service count already in system! */
0340     if (G_server_conf.adv_service_count+1>MAX_SVC_PER_SVR)
0341     {
0342         ndrx_TPset_error_fmt(TPELIMIT, "Service limit per process %d reached!", 
0343                 MAX_SVC_PER_SVR-ATMI_SRV_Q_ADJUST);
0344         EXFAIL_OUT(ret);
0345     }
0346     
0347     /* This will be the current, note that count is already lass then 1, 
0348      * and suites ok as index
0349      */
0350     service = G_server_conf.adv_service_count - ATMI_SRV_Q_ADJUST;
0351     
0352 #ifdef EX_USE_POLL
0353     snprintf(entry_new->listen_q, sizeof(entry_new->listen_q), NDRX_SVC_QFMT_SRVID, 
0354             G_server_conf.q_prefix, entry_new->svc_nm, (short)G_server_conf.srv_id);
0355 #else
0356     snprintf(entry_new->listen_q, sizeof(entry_new->listen_q), NDRX_SVC_QFMT, 
0357             G_server_conf.q_prefix, entry_new->svc_nm);
0358 #endif
0359     
0360     /* lookup the service settings in shm... */
0361     if (EXTRUE==(ret=ndrx_ddr_service_get(svc_nm, &autotran, &trantime)))
0362     {
0363         NDRX_LOG(log_debug, "Service [%s] found in <services> section autotran: %d trantime: %lu",
0364                 svc_nm, autotran, trantime);
0365         entry_new->autotran = autotran;
0366         entry_new->trantime = trantime;
0367         ret=EXSUCCEED;
0368     }
0369     
0370     if (EXFAIL==ret)
0371     {
0372         /* routing error is loaded.. */
0373         EXFAIL_OUT(ret);
0374     }
0375     
0376     /* We are good to go, open q? */
0377     
0378     NDRX_LOG(log_debug, "About to listen on: %s", entry_new->listen_q);
0379     
0380     /* ###################### CRITICAL SECTION ############################### */
0381     /* Acquire semaphore here.... */
0382     if (G_shm_srv && EXSUCCEED!=ndrx_lock_svc_op(__func__))
0383     {
0384         NDRX_LOG(log_error, "Failed to lock sempahore");
0385         ret=EXFAIL;
0386         goto out;
0387     }
0388 
0389     /* Open the queue */
0390     
0391     /* open service Q, also give some svc name here!  */
0392     if (ndrx_epoll_shallopenq(ATMI_SRV_Q_ADJUST+G_server_conf.adv_service_count))
0393     {
0394         entry_new->q_descr = ndrx_mq_open_at (entry_new->listen_q, 
0395                                 O_RDWR | O_CREAT | O_NONBLOCK, S_IWUSR | S_IRUSR, NULL);
0396         
0397         /*
0398          * Check are we ok or failed?
0399          */
0400         if ((mqd_t)EXFAIL==entry_new->q_descr)
0401         {
0402            /* Release semaphore! */
0403             if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0404 
0405            ndrx_TPset_error_fmt(TPEOS, "Failed to open queue: %s: %s",
0406                                        entry_new->listen_q, strerror(errno));
0407            EXFAIL_OUT(ret);
0408         }
0409     }
0410     else
0411     {
0412         /* System V mode, where services does not require separate queue  
0413         entry_new->q_descr = ndrx_epoll_service_add(entry_new->svc_nm, 
0414                 G_server_conf.adv_service_count, (mqd_t)EXFAIL);
0415          * */
0416         entry_new->q_descr = (mqd_t)EXFAIL;
0417     }
0418     
0419     /* re-define service, used for particular systems... like system v */
0420     entry_new->q_descr=ndrx_epoll_service_add(entry_new->svc_nm, 
0421             G_server_conf.adv_service_count, entry_new->q_descr);
0422 
0423     if ((mqd_t)EXFAIL==entry_new->q_descr)
0424     {
0425         /* Release semaphore! */
0426          if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0427          
0428         ndrx_TPset_error_fmt(TPEOS, "Failed to register poller "
0429                 "svc at idx %d: %s: %s",
0430                 G_server_conf.adv_service_count, entry_new->listen_q, strerror(errno));
0431         EXFAIL_OUT(ret);
0432     }
0433     
0434     /* Register stuff in shared memory! */
0435     if (G_shm_srv)
0436     {
0437 #ifdef EX_USE_SYSVQ
0438         ret=ndrx_shm_install_svc(entry_new->svc_nm, 0, ndrx_epoll_resid_get());
0439 #else
0440         ret=ndrx_shm_install_svc(entry_new->svc_nm, 0, G_server_conf.srv_id);
0441 #endif
0442     }
0443     
0444     /* Release semaphore! */
0445     if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0446     /* ###################### CRITICAL SECTION, END ########################## */
0447     
0448     /* leftover from  ndrx_shm_install_svc() failure... */
0449     if (EXSUCCEED!=ret)
0450     {
0451         ndrx_TPset_error_fmt(TPELIMIT, "Service shared memory is full. "
0452                 "Try to increase NDRX_SVCMAX");
0453         EXFAIL_OUT(ret);
0454     }
0455     
0456     /* Save the time when stuff is open! */
0457     ndrx_stopwatch_reset(&entry_new->qopen_time);
0458 
0459     NDRX_LOG(log_debug, "Got file descriptor: %d, adding to e-poll!",
0460                             entry_new->q_descr);
0461     
0462     /* Put stuff in linear array */
0463     sz = sizeof(*G_server_conf.service_array)*(G_server_conf.adv_service_count+1);
0464     
0465     G_server_conf.service_array = NDRX_REALLOC(G_server_conf.service_array, sz);
0466     
0467     if (NULL==G_server_conf.service_array)
0468     {
0469         ndrx_TPset_error_fmt(TPEOS, "Failed to reallocate memory to %d bytes!", sz);
0470         ret=EXFAIL;
0471         goto out;
0472     }
0473     
0474     /* Fill up service array */
0475     G_server_conf.service_array[G_server_conf.adv_service_count] = entry_new;
0476             
0477     G_server_conf.adv_service_count++;
0478     
0479     memset(&ev, 0, sizeof(ev));
0480     ev.events = EX_EPOLL_FLAGS;
0481 #ifdef EX_USE_EPOLL
0482     ev.data.fd = entry_new->q_descr;
0483 #else
0484     ev.data.mqd = entry_new->q_descr;
0485 #endif
0486     
0487     if (EXFAIL==ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_ADD,
0488                             entry_new->q_descr, &ev))
0489     {
0490         G_server_conf.adv_service_count--;
0491         ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_ctl failed: %s", 
0492                 ndrx_poll_strerror(ndrx_epoll_errno()));
0493         ret=EXFAIL;
0494         goto out;
0495     }
0496     
0497     /* Set shared memory to available! */
0498     G_shm_srv->svc_status[service] = NDRXD_SVC_STATUS_AVAIL;
0499     
0500     /* Send to NDRXD that we have are OK! */
0501     if (EXSUCCEED!=advertse_to_ndrxd(entry_new))
0502     {
0503         NDRX_LOG(log_error, "Failed to send advertise message to NDRXD!");
0504         ret=EXFAIL;
0505         goto out;
0506     }
0507 
0508 out:
0509     
0510     if (EXSUCCEED!=ret && (mqd_t)EXFAIL!=entry_new->q_descr)
0511     {
0512         /* we remove from poller, have it added there or not
0513          * ignore error */
0514         ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_DEL, 
0515                 entry_new->q_descr, NULL);
0516         
0517         /* close the queue, if needed to open. */
0518         if (ndrx_epoll_shallopenq(ATMI_SRV_Q_ADJUST+G_server_conf.adv_service_count))
0519         {
0520             ndrx_mq_close(entry_new->q_descr);
0521         }
0522         
0523     }
0524     return ret;
0525 }
0526 /* vim: set ts=4 sw=4 et smartindent: */