Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Enduro/X server main entry point
0003  *
0004  * @file svqdispatch.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 #include <ndrx_config.h>
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <sys_mqueue.h>
0040 #include <errno.h>
0041 #include <sys/stat.h>
0042 #include <setjmp.h>
0043 
0044 #include <ndrstandard.h>
0045 #include <ndebug.h>
0046 #include <utlist.h>
0047 #include <string.h>
0048 #include <fcntl.h>
0049 
0050 #include "srv_int.h"
0051 #include "xa_cmn.h"
0052 #include "atmi_tls.h"
0053 #include <atmi_int.h>
0054 #include <typed_buf.h>
0055 #include <nstopwatch.h>
0056 #include <atmi_shm.h>
0057 #include <gencall.h>
0058 #include <tperror.h>
0059 #include <userlog.h>
0060 #include <atmi.h>
0061 /*---------------------------Externs------------------------------------*/
0062 /* THIS IS HOOK FOR TESTING!! */
0063 expublic void (*___G_test_delayed_startup)(void) = NULL;
0064 /*---------------------------Macros-------------------------------------*/
0065 /*---------------------------Enums--------------------------------------*/
0066 /*---------------------------Typedefs-----------------------------------*/
0067 
0068 /**
0069  * Dispatch thread submit the work
0070  */
0071 typedef struct
0072 {
0073     char *call_buf;
0074     long call_len;
0075     int call_no;
0076 } thread_dispatch_t;
0077 
0078 /*---------------------------Globals------------------------------------*/
0079 expublic int G_shutdown_req = 0;
0080 
0081 /* Only for poll() mode: */
0082 expublic int G_shutdown_nr_wait = 0;   /* Number of self shutdown messages to wait */
0083 expublic int G_shutdown_nr_got = 0;    /* Number of self shutdown messages got  */
0084 /*---------------------------Statics------------------------------------*/
0085 exprivate int M_autojoin = EXTRUE;      /**< perform autmatic tx join */
0086 /*---------------------------Prototypes---------------------------------*/
0087 
0088 /**
0089  * Configure autojoin flag
0090  * @param[in] new_flag new setting EXFALSE - do not do autojoin on call
0091  * @return new_flag original value
0092  */
0093 expublic int ndrx_sv_set_autojoin(int new_flag)
0094 {
0095     int autojoin = M_autojoin;
0096     M_autojoin=new_flag;
0097     return autojoin;
0098 }
0099 
0100 /**
0101  * Perform late transaction join
0102  * return EXSUCCEED/EXFAIL
0103  */
0104 expublic int ndrx_sv_latejoin(void)
0105 {
0106     int ret = EXSUCCEED;
0107     tp_command_call_t * call = ndrx_get_G_last_call();
0108 
0109     if (EXEOS!=call->tmxid[0] && EXSUCCEED!=_tp_srv_join_or_new_from_call(call, EXFALSE))
0110     {
0111         NDRX_LOG(log_error, "Failed to start/join global tx [%s]!", call->tmxid);
0112         userlog("Failed to start/join global tx [%s]!", call->tmxid);
0113         EXFAIL_OUT(ret);
0114     }
0115 out:
0116     return ret;
0117 }
0118 
0119 
0120 /**
0121  * Open queues for listening.
0122  * @return 
0123  */
0124 expublic int sv_open_queue(void)
0125 {
0126     int ret=EXSUCCEED;
0127     int i;
0128     svc_entry_fn_t *entry;
0129     struct ndrx_epoll_event ev;
0130     int use_sem = EXFALSE;
0131     
0132     
0133     /* Register for (e-)polling 
0134      * moved up here for system v resources have be known when installing to SHM
0135      */
0136     G_server_conf.epollfd = ndrx_epoll_create(G_server_conf.max_events);
0137     if (EXFAIL==G_server_conf.epollfd)
0138     {
0139         ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_create(%d) fail: %s",
0140                                 G_server_conf.adv_service_count,
0141                                 ndrx_poll_strerror(ndrx_epoll_errno()));
0142         ret=EXFAIL;
0143         goto out;
0144     }
0145     
0146     for (i=0; i<G_server_conf.adv_service_count; i++)
0147     {
0148         entry = G_server_conf.service_array[i];
0149 
0150         NDRX_LOG(log_debug, "About to listen on: %s", entry->listen_q);
0151 
0152         /* TODO: unlink the queue? If specific Q? admin or reply? */
0153 
0154         
0155         /* ###################### CRITICAL SECTION ############################### */
0156         
0157         /* Acquire semaphore here.... */
0158         if (G_shm_srv && EXEOS!=entry->svc_nm[0]) 
0159         {
0160             use_sem = EXTRUE;
0161         }
0162         else
0163         {
0164             /* Bug #610 */
0165             use_sem = EXFALSE;
0166         }
0167         
0168         if (use_sem && EXSUCCEED!=ndrx_lock_svc_op(__func__))
0169         {
0170             NDRX_LOG(log_error, "Failed to lock sempahore");
0171             ret=EXFAIL;
0172             goto out;
0173         }
0174         
0175         if (NULL!=___G_test_delayed_startup && use_sem)
0176         {
0177             ___G_test_delayed_startup();
0178         }
0179         
0180         /* Open the queue */
0181         /* open service Q, also give some svc name here!  */
0182         
0183         if (ndrx_epoll_shallopenq(i))
0184         {
0185             
0186 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0187             /* for poll mode, we must ensure that queue does not exists before start
0188              */
0189             if (EXSUCCEED!=ndrx_mq_unlink(entry->listen_q))
0190             {
0191                 NDRX_LOG(log_debug, "debug: Failed to unlink [%s]: %s", entry->listen_q, 
0192                         ndrx_poll_strerror(ndrx_epoll_errno()));
0193             }
0194 #endif
0195             /* normal operations, each service have it's own queue... */
0196             entry->q_descr = ndrx_mq_open_at (entry->listen_q, O_RDWR | O_CREAT |
0197                     O_NONBLOCK, S_IWUSR | S_IRUSR, NULL);
0198             
0199             if ((mqd_t)EXFAIL!=entry->q_descr)
0200             {
0201                 /* re-define service, used for particular systems... like system v */
0202                 entry->q_descr=ndrx_epoll_service_add(entry->svc_nm, i, entry->q_descr);
0203             }
0204         }
0205         else
0206         {
0207             /* System V mode, where services does not require separate queue  */
0208             entry->q_descr = ndrx_epoll_service_add(entry->svc_nm, 
0209                     i, (mqd_t)EXFAIL);
0210         }
0211         
0212         /*
0213          * Check are we ok or failed?
0214          */
0215         if ((mqd_t)EXFAIL==entry->q_descr)
0216         {
0217             /* Release semaphore! */
0218             if (use_sem) 
0219                 ndrx_unlock_svc_op(__func__);
0220             
0221             ndrx_TPset_error_fmt(TPEOS, "Failed to open queue: %s: %s",
0222                                         entry->listen_q, strerror(errno));
0223             ret=EXFAIL;
0224             goto out;
0225         }
0226         
0227         /* Register stuff in shared memory! */
0228         if (use_sem)
0229         {
0230 #ifdef EX_USE_SYSVQ
0231             ret=ndrx_shm_install_svc(entry->svc_nm, 0, ndrx_epoll_resid_get());
0232 #else
0233             ret=ndrx_shm_install_svc(entry->svc_nm, 0, G_server_conf.srv_id);
0234 #endif
0235         }
0236 
0237         /* Release semaphore! */
0238         if (use_sem) ndrx_unlock_svc_op(__func__);
0239         /* ###################### CRITICAL SECTION, END ########################## */
0240         
0241         if (EXSUCCEED!=ret)
0242         {
0243             NDRX_LOG(log_error, "Service shared memory full - currently ignore error!");
0244             ret=EXSUCCEED;
0245         }
0246         
0247         /* Save the time when stuff is open! */
0248         ndrx_stopwatch_reset(&entry->qopen_time);
0249 
0250         NDRX_LOG(log_debug, "Got file descriptor: %d", entry->q_descr);
0251     }
0252 
0253     /* allocate events */
0254     G_server_conf.events = (struct ndrx_epoll_event *)NDRX_CALLOC(sizeof(struct ndrx_epoll_event),
0255                                             G_server_conf.max_events);
0256     if (NULL==G_server_conf.events)
0257     {
0258         ndrx_TPset_error_fmt(TPEOS, "Failed to allocate epoll events: %s", 
0259                 strerror(errno));
0260         ret=EXFAIL;
0261         goto out;
0262     }
0263 
0264     /* Bind to epoll queue descriptors */    
0265 
0266     memset(&ev, 0, sizeof(ev));
0267 
0268     for (i=0; i<G_server_conf.adv_service_count; i++)
0269     {
0270         ev.events = EX_EPOLL_FLAGS;
0271 #ifdef EX_USE_EPOLL
0272         ev.data.fd = G_server_conf.service_array[i]->q_descr;
0273 #else
0274         /* this is ok, more accurate */
0275         ev.data.mqd = G_server_conf.service_array[i]->q_descr;
0276 #endif
0277         
0278         if (EXFAIL==ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_ADD,
0279                                 G_server_conf.service_array[i]->q_descr, &ev))
0280         {
0281             ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_ctl failed: %s", 
0282                     ndrx_poll_strerror(ndrx_epoll_errno()));
0283             ret=EXFAIL;
0284             goto out;
0285         }
0286     }
0287 
0288 out:
0289     return ret;
0290 }
0291 
0292 /**
0293  * Copy service name to svcinfo struct
0294  * take care of @ routing group...
0295  */
0296 #define CPY_SERVICE_NAME do {\
0297         NDRX_STRCPY_SAFE(svcinfo.name, call->name);\
0298         if (!G_server_conf.ddr_keep_grp)\
0299         {\
0300             char *p=strchr(svcinfo.name, NDRX_SYS_SVC_PFXC);\
0301             /* strip off the group */\
0302             if (NULL!=p && p!=svcinfo.name)\
0303             {\
0304                 *p=EXEOS;\
0305             }\
0306         }\
0307     } while (0)
0308 
0309 /**
0310  * Serve service call
0311  * @param call_buf call buffer
0312  * @param call_len call buffer len
0313  * @param call_no call service number
0314  * @return
0315  */
0316 expublic int sv_serve_call(int *service, int *status,
0317                     char **call_buf, long call_len, int call_no)
0318 {
0319     int ret=EXSUCCEED;
0320     char *request_buffer = NULL;
0321     long req_len = 0;
0322     int reply_type;
0323     tp_command_call_t *call = (tp_command_call_t*)*call_buf;
0324     buffer_obj_t *outbufobj=NULL; /* Have a reference to allocated buffer */
0325     long call_age;
0326     int generate_rply = EXFALSE;
0327     tp_command_call_t * last_call=NULL;
0328     long error_code = TPESVCERR; /**< Default error in case if cannot process */
0329     *status=EXSUCCEED;
0330     G_atmi_tls->atmisrv_reply_type = 0;
0331     
0332     call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0333 
0334     NDRX_LOG(log_debug, "got call, cd: %d timestamp: %d callseq: %u, "
0335             "svc: %s, flags: %ld, call age: %ld, data_len: %ld, caller: %s "
0336                         " reply_to: %s, clttout: %d",
0337                         call->cd, call->timestamp, call->callseq, 
0338             call->name, call->flags, call_age, call->data_len,
0339                         call->my_id, call->reply_to, call->clttout);
0340     
0341     if (call->clttout > 0 && call_age >= call->clttout && 
0342             !(call->flags & TPNOTIME))
0343     {
0344         NDRX_LOG(log_error, "Received expired call - drop, cd: %d timestamp: %d callseq: %u, "
0345             "svc: %s, flags: %ld, call age: %ld, data_len: %ld, caller: %s "
0346                         " reply_to: %s, clttout: %d",
0347                         call->cd, call->timestamp, call->callseq, 
0348             call->name, call->flags, call_age, call->data_len,
0349                         call->my_id, call->reply_to, call->clttout);
0350         userlog("Received expired call - drop, cd: %d timestamp: %d callseq: %u, "
0351             "svc: %s, flags: %ld, call age: %ld, data_len: %ld, caller: %s "
0352                         " reply_to: %s, clttout: %d",
0353                         call->cd, call->timestamp, call->callseq, 
0354             call->name, call->flags, call_age, call->data_len,
0355                         call->my_id, call->reply_to, call->clttout);
0356         *status=EXFAIL;
0357         goto out;
0358     }
0359     
0360     ret = ndrx_mbuf_prepare_incoming (call->data, call->data_len, 
0361                     &request_buffer, &req_len, 0, 0);
0362 
0363     if (EXSUCCEED!=ret)
0364     {
0365         *status=EXFAIL;
0366         error_code = TPEITYPE;
0367         generate_rply = EXTRUE;
0368         goto out;
0369     }
0370     else
0371     {
0372         /* this must succeed */
0373         outbufobj=ndrx_find_buffer(request_buffer);
0374         
0375         /* how about NULL buffer? */
0376         outbufobj->autoalloc = 1; /* We have stuff autoallocated! */
0377         NDRX_LOG(log_debug, "Buffer=%p autoalloc=%hd", 
0378                 outbufobj->buf, outbufobj->autoalloc);
0379     }
0380     
0381     /* Now we should call the service by it self, also we should check was reply back or not */
0382 
0383     if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP ||
0384             /* move to atmi_tls: */
0385             0==(reply_type=setjmp(G_atmi_tls->call_ret_env)))
0386     {
0387         TPSVCINFO svcinfo;
0388         memset(&svcinfo, 0, sizeof(TPSVCINFO));
0389 
0390         svcinfo.data = request_buffer;
0391         svcinfo.len = req_len;
0392 
0393         CPY_SERVICE_NAME;
0394         
0395         svcinfo.flags = call->flags;
0396         svcinfo.cd = call->cd;
0397         
0398         /* set the client id to caller */
0399         NDRX_STRCPY_SAFE(svcinfo.cltid.clientdata, (char *)call->my_id);
0400         last_call = ndrx_get_G_last_call();
0401         memcpy(last_call, call, sizeof(tp_command_call_t));
0402                              /* save last call info to ATMI library
0403                               * (this does excludes data by default) */
0404         
0405         /* Register global tx */
0406         if (EXEOS!=call->tmxid[0])
0407         {
0408             if (M_autojoin && EXSUCCEED!=_tp_srv_join_or_new_from_call(call, EXFALSE))
0409             {
0410                 NDRX_LOG(log_error, "Failed to start/join global tx [%s]!", call->tmxid);
0411                 userlog("Failed to start/join global tx [%s]!", call->tmxid);
0412 
0413                 /* TODO: We have died here... so the dispatcher must
0414                  * return TPFAIL, and we should notify master, that this RM is
0415                  * failed!!!!
0416                  */
0417                 *status=EXFAIL;
0418                 generate_rply = EXTRUE;
0419                 error_code = TPETRAN;
0420                 goto out;
0421             }
0422             
0423             if (last_call->sysflags & SYS_FLAG_AUTOTRAN)
0424             {
0425                 NDRX_LOG(log_debug, "Marking as transaction initiator");
0426                 /* set us as a masters...
0427                  * of transaction due to fact that we received the call
0428                  */
0429                 G_atmi_tls->G_atmi_xa_curtx.txinfo->tranid_flags|=XA_TXINFO_INITIATOR;
0430             }
0431         }
0432         else if (G_server_conf.service_array[call_no]->autotran)
0433         {
0434             NDRX_LOG(log_debug, "Starting auto transaction");
0435             if (EXFAIL==tpbegin(G_server_conf.service_array[call_no]->trantime, 0))
0436             {
0437                 NDRX_LOG(log_error, "Failed to start autotran (trantime=%lu): %s", 
0438                         G_server_conf.service_array[call_no]->trantime, tpstrerror(tperrno));
0439                 userlog("Failed to start autotran (trantime=%lu): %s", 
0440                         G_server_conf.service_array[call_no]->trantime, tpstrerror(tperrno));
0441 
0442                 *status=EXFAIL;
0443                 generate_rply = EXTRUE; /**< error 14 */
0444                 error_code = TPETRAN;
0445                 goto out;
0446             }
0447             
0448             /* auto tran is started */
0449             last_call->sysflags|=SYS_FLAG_AUTOTRAN;
0450         }
0451         
0452         /* If we run in abort only mode and do some forwards & etc.
0453          * Then we should keep the abort status.
0454          Moved to tmisabortonly! flag field.
0455         if (ndrx_get_G_atmi_xa_curtx()->txinfo && (ndrx_get_last_call()->sysflags & SYS_XA_ABORT_ONLY))
0456         {
0457             NDRX_LOG(log_warn, "Marking current global tx as ABORT_ONLY");
0458             ndrx_get_G_atmi_xa_curtx()->txinfo->tmisabortonly = TRUE;
0459         }
0460          * */
0461         
0462         /* call the function */
0463         *service=call_no-ATMI_SRV_Q_ADJUST;
0464         if (G_shm_srv)
0465         {
0466             if (G_server_conf.is_threaded)
0467             {
0468                 NDRX_SPIN_LOCK_V(G_server_conf.mt_lock);
0469                 G_shm_srv->svc_status[*service]++;
0470                 NDRX_SPIN_UNLOCK_V(G_server_conf.mt_lock);
0471                 /* put reply address - not supported.
0472                 NDRX_STRCPY_SAFE(G_shm_srv->last_reply_q, call->reply_to);
0473                  * */
0474             }
0475             else
0476             {
0477                 G_shm_srv->svc_status[*service] = NDRXD_SVC_STATUS_BUSY;
0478                 /* put reply address */
0479                 NDRX_STRCPY_SAFE(G_shm_srv->last_reply_q, call->reply_to);
0480             }
0481         }
0482         
0483         /* We need to convert buffer here (if function set...) */
0484         if (NULL!=request_buffer &&
0485                 G_server_conf.service_array[call_no]->xcvtflags && 
0486                 
0487                 /* convert in case when really needed */
0488                 ( 
0489                   /* UBF2JSON */
0490                   (BUF_TYPE_UBF == outbufobj->type_id && 
0491                     SYS_SRV_CVT_UBF2JSON & G_server_conf.service_array[call_no]->xcvtflags)
0492                 ||
0493                   (BUF_TYPE_JSON == outbufobj->type_id && SYS_SRV_CVT_JSON2UBF & 
0494                     G_server_conf.service_array[call_no]->xcvtflags)
0495                 
0496                   /* VIEW2JSON */
0497                 || (BUF_TYPE_VIEW == outbufobj->type_id && 
0498                     SYS_SRV_CVT_VIEW2JSON & G_server_conf.service_array[call_no]->xcvtflags)
0499                 ||
0500                   (BUF_TYPE_JSON == outbufobj->type_id && SYS_SRV_CVT_JSON2VIEW & 
0501                     G_server_conf.service_array[call_no]->xcvtflags)
0502                 
0503                 )
0504             )
0505         {
0506             /* 
0507              * Mark that buffer is converted...
0508              * So that later we can convert back...
0509              */
0510             last_call->sysflags|= G_server_conf.service_array[call_no]->xcvtflags;
0511             call->sysflags |= G_server_conf.service_array[call_no]->xcvtflags;
0512             
0513             if (EXSUCCEED!=typed_xcvt(&outbufobj, call->sysflags, EXFALSE))
0514             {
0515                 NDRX_LOG(log_debug, "Failed to convert buffer service "
0516                             "format: %llx", last_call->sysflags);
0517                 userlog("Failed to convert buffer service "
0518                             "format: %llx", last_call->sysflags);
0519                 *status=EXFAIL;
0520                 generate_rply = EXTRUE;
0521                 goto out;
0522             }
0523             else
0524             {
0525                 svcinfo.data = outbufobj->buf;
0526                 svcinfo.len = outbufobj->size;
0527             }
0528         }
0529         
0530         last_call->autobuf = outbufobj;
0531         
0532         /* For golang integration we need to know at service the function name */
0533         NDRX_STRCPY_SAFE(svcinfo.fname, G_server_conf.service_array[call_no]->fn_nm);
0534         
0535         if (EXFAIL!=*status) /* Dot not invoke if failed! */
0536         {
0537             G_server_conf.service_array[call_no]->p_func(&svcinfo);
0538         }
0539         
0540         if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP &&
0541                 /* Server did return:  */
0542                 (G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_TPRETURN || 
0543                  G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_TPFORWARD
0544                 )
0545             )
0546         {
0547             /* System does normal function return... */
0548             NDRX_LOG(log_debug, "Got back from reply/forward (%d) w/o long jump",
0549                                         G_atmi_tls->atmisrv_reply_type);
0550             if (G_atmi_tls->atmisrv_reply_type & RETURN_FAILED || 
0551                     G_atmi_tls->atmisrv_reply_type & RETURN_SVC_FAIL)
0552             {
0553                 *status=EXFAIL;
0554             }
0555         }
0556         else if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP &&
0557                 G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_THREAD)
0558         {
0559             NDRX_LOG(log_info, "tpcontinue() issued from integra (no longjmp)!");
0560         }
0561         else
0562         {
0563             NDRX_LOG(log_warn, "No return from service!");
0564             
0565             /* if no return in the end... we must abort... 
0566              * TODO: might want to use the same ndrx_xa_join_fail() / disassoc?
0567              */
0568             if (tpgetlev() && last_call->sysflags & SYS_FLAG_AUTOTRAN)
0569             {
0570                 NDRX_LOG(log_error, "ERROR: Auto-tran started [%s], but no tpreturn() - ABORTING...", 
0571                         G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0572                 userlog("ERROR: Auto-tran started [%s], but no tpreturn() - ABORTING...", 
0573                         G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0574                 if (EXSUCCEED!=ndrx_tpabort(0, EXTRUE))
0575                 {
0576                     NDRX_LOG(log_error, "Auto abort failed: %s", tpstrerror(tperrno));
0577                     userlog("Auto abort failed: %s", tpstrerror(tperrno));
0578                 }
0579             }
0580 
0581             if (!(svcinfo.flags & TPNOREPLY))
0582             {
0583                 /* if we are here, then there was no reply! */
0584                 NDRX_LOG(log_error, "PROTO error - no reply from service [%s]",
0585                                                 call->name);
0586                 /* reply with failure back */
0587                 *status=EXFAIL;
0588                 goto out;
0589             }
0590         }
0591     }
0592     else
0593     {
0594         NDRX_LOG(log_debug, "Got back from reply/forward (%d)",
0595                                         reply_type);
0596         if (reply_type & RETURN_FAILED || reply_type & RETURN_SVC_FAIL)
0597         {
0598             *status=EXFAIL;
0599         }
0600     }
0601     
0602 out:
0603 
0604     if (generate_rply)
0605     {        
0606         /* Reply back with failure... */
0607         reply_with_failure(TPNOBLOCK, call, NULL, NULL, error_code);   
0608     }
0609 
0610     /* free_up_buffers(); - services assumes that memory is alloced for all the time
0611      * i.e. they do manual management of memory: tpfree.
0612      */ 
0613     /* We shall find auto allocated buffer and remove it! */
0614 
0615     /* for integra continue leave the buffers in place.. 
0616      * We will do this per call return buffer bases & if buffer is auto.
0617     if (!(G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP &&
0618                 G_atmisrv_reply_type & RETURN_TYPE_THREAD))
0619     {
0620         free_auto_buffers();
0621     }
0622     */
0623     
0624     return ret;
0625 }
0626 
0627 /**
0628  * Serve service call
0629  * @param call_buf original call buffer
0630  * @param call_len original call buffer len
0631  * @param call_no call service number
0632  * @return
0633  */
0634 expublic int sv_serve_connect(int *service, int *status, 
0635         char **call_buf, long call_len, int call_no)
0636 {
0637     int ret=EXSUCCEED;
0638     char *request_buffer = NULL;
0639     long req_len = 0;
0640     int reply_type;
0641     tp_command_call_t *call = (tp_command_call_t*)*call_buf;
0642     *status=EXSUCCEED;
0643     long call_age;
0644     tp_command_call_t * last_call = ndrx_get_G_last_call();
0645     int generate_rply = EXFALSE;
0646     int error_code = TPESVCERR; /**< Default error in case if cannot process */
0647     buffer_obj_t *outbufobj=NULL; /* Have a reference to allocated buffer */
0648 
0649     *status=EXSUCCEED;
0650     
0651     G_atmi_tls->atmisrv_reply_type = 0;
0652     
0653     NDRX_LOG(log_debug, "got connect, cd: %d timestamp: %d callseq: %u, clttout",
0654                         call->cd, call->timestamp, call->callseq, call->clttout);
0655     
0656     call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0657     
0658     if (call->clttout > 0 && call_age >= call->clttout && 
0659             !(call->flags & TPNOTIME))
0660     {
0661         NDRX_LOG(log_error, "Received connect already expired! "
0662                 "call age = %ld s, client timeout = %d s, caller: %s",
0663                 call_age, call->clttout, call->my_id);
0664 
0665         userlog("Received connect already expired! "
0666                 "call age = %ld s, client timeout = %d s, caller: %s",
0667                 call_age, call->clttout, call->my_id);
0668         *status=EXFAIL;
0669         goto out;
0670     }
0671     
0672     /* Prepare the call buffer */
0673     /* TODO: Check buffer type - if invalid this should raise segfault! */
0674     /* We can have data len 0! */
0675     
0676     ret = ndrx_mbuf_prepare_incoming (call->data, call->data_len, 
0677                     &request_buffer, &req_len, 0, 0);
0678     
0679     if (EXSUCCEED!=ret)
0680     {
0681         *status=EXFAIL;
0682         error_code = TPEITYPE;
0683         generate_rply = EXTRUE;
0684         goto out;
0685     }
0686     else
0687     {
0688         /* this must succeed */
0689         outbufobj=ndrx_find_buffer(request_buffer);
0690         
0691         /* how about NULL buffer? */
0692         outbufobj->autoalloc = 1; /* We have stuff autoallocated! */
0693         NDRX_LOG(log_debug, "Buffer=%p autoalloc=%hd", 
0694                 outbufobj->buf, outbufobj->autoalloc);
0695     }
0696 
0697     /* Now we should call the service by it self, also we should check was reply back or not */
0698 
0699     if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP || 
0700             0==(reply_type=setjmp(G_atmi_tls->call_ret_env)))
0701     {
0702         TPSVCINFO svcinfo;
0703         memset(&svcinfo, 0, sizeof(TPSVCINFO));
0704 
0705         svcinfo.data = request_buffer;
0706         /* Bug #789 */
0707         svcinfo.len = req_len;
0708 
0709         CPY_SERVICE_NAME;        
0710         svcinfo.flags = call->flags;
0711         svcinfo.cd = call->cd;
0712         /* set the client id to caller */
0713         NDRX_STRCPY_SAFE(svcinfo.cltid.clientdata, (char *)call->my_id);
0714         
0715         
0716         *last_call = *call; /* save last call info to ATMI library
0717                               * (this does excludes data by default) */
0718     /* Bug #799, mv 2023.02.28 */
0719     last_call->autobuf = outbufobj;
0720 
0721         /* Because we are in conversation, we should make a special cd
0722          * for case when we are as server
0723          * This will be cd + MAX, meaning, that we have called.
0724          */
0725         svcinfo.cd+=NDRX_CONV_UPPER_CNT;
0726         last_call->cd+=NDRX_CONV_UPPER_CNT;
0727         NDRX_LOG(log_debug, "Read cd=%d making as %d (+%d - we are server!)",
0728                                         call->cd, svcinfo.cd, NDRX_CONV_UPPER_CNT);
0729 
0730         /* Register global tx */
0731         if (EXEOS!=call->tmxid[0])
0732         {
0733             if (M_autojoin && EXSUCCEED!=_tp_srv_join_or_new_from_call(call, EXFALSE))
0734             {
0735                 NDRX_LOG(log_error, "Failed to start/join global tx [%s]!", call->tmxid);
0736                 userlog("Failed to start/join global tx [%s]!", call->tmxid);
0737 
0738                 /* TODO: We have died here... so the dispatcher must
0739                  * return TPFAIL, and we should notify master, that this RM is
0740                  * failed!!!!
0741                  */
0742                 *status=EXFAIL;
0743                 error_code = TPETRAN;
0744                 generate_rply = EXTRUE;
0745                 goto out;
0746             }
0747         }
0748         else if (G_server_conf.service_array[call_no]->autotran)
0749         {
0750             NDRX_LOG(log_debug, "Starting auto transaction");
0751             if (EXFAIL==tpbegin(G_server_conf.service_array[call_no]->trantime, 0))
0752             {
0753                 NDRX_LOG(log_error, "Failed to start autotran (trantime=%lu): %s", 
0754                         G_server_conf.service_array[call_no]->trantime, tpstrerror(tperrno));
0755                 userlog("Failed to start autotran (trantime=%lu): %s", 
0756                         G_server_conf.service_array[call_no]->trantime, tpstrerror(tperrno));
0757 
0758                 *status=EXFAIL;
0759                 generate_rply = EXTRUE; /**< error 14 */
0760                 error_code = TPETRAN;
0761                 goto out;
0762             }
0763             
0764             /* auto tran is started */
0765             last_call->sysflags|=SYS_FLAG_AUTOTRAN;
0766         }
0767         
0768         /* At this point we should build up conversation queues
0769          * Open for read their queue, and open for write our queue to listen
0770          * on.
0771          */
0772         if (EXFAIL==accept_connection())
0773         {
0774             ret=EXFAIL;
0775             
0776             *status=EXFAIL;
0777             generate_rply=EXTRUE;
0778             goto out;
0779         }        
0780         
0781         /* If we run in abort only mode and do some forwards & etc.
0782          * Then we should keep the abort status.
0783          Moved to tmisabortonly! flag field.
0784         if (ndrx_get_G_atmi_xa_curtx()->txinfo && (ndrx_get_last_call()->sysflags & SYS_XA_ABORT_ONLY))
0785         {
0786             NDRX_LOG(log_warn, "Marking current global tx as ABORT_ONLY");
0787             ndrx_get_G_atmi_xa_curtx()->txinfo->tmisabortonly = TRUE;
0788         }
0789          */
0790  
0791 
0792         /* call the function */
0793         *service=call_no-ATMI_SRV_Q_ADJUST;
0794         if (G_shm_srv)
0795         {
0796             if (G_server_conf.is_threaded)
0797             {
0798                 NDRX_SPIN_LOCK_V(G_server_conf.mt_lock);
0799                 G_shm_srv->svc_status[*service]++;
0800                 NDRX_SPIN_UNLOCK_V(G_server_conf.mt_lock);
0801                 /* put reply address  - not supported..
0802                 NDRX_STRCPY_SAFE(G_shm_srv->last_reply_q, call->reply_to);
0803                  * */
0804             }
0805             else
0806             {
0807                 G_shm_srv->svc_status[*service] = NDRXD_SVC_STATUS_BUSY;
0808                 /* put reply address */
0809                 NDRX_STRCPY_SAFE(G_shm_srv->last_reply_q, call->reply_to);
0810             }
0811         }
0812         /* For golang integration we need to know at service the function name */
0813         NDRX_STRCPY_SAFE(svcinfo.fname, G_server_conf.service_array[call_no]->fn_nm);
0814         G_server_conf.service_array[call_no]->p_func(&svcinfo);
0815         
0816         /*Needs some patch for go-lang that we do not use long jumps...
0817          * + we we need to get the status back...
0818          */
0819         if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP &&
0820                 /* Server did return:  */
0821                 (G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_TPRETURN || 
0822                  G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_TPFORWARD
0823                 )
0824             )
0825         {
0826             NDRX_LOG(log_debug, "Got back from reply/forward (%d) (no longjmp)",
0827                                         G_atmi_tls->atmisrv_reply_type);
0828         
0829             if (G_atmi_tls->atmisrv_reply_type & RETURN_FAILED || 
0830                     G_atmi_tls->atmisrv_reply_type & RETURN_SVC_FAIL)
0831             {
0832                 *status=EXFAIL;
0833             }
0834         }
0835         else if (G_libatmisrv_flags & ATMI_SRVLIB_NOLONGJUMP &&
0836                 G_atmi_tls->atmisrv_reply_type & RETURN_TYPE_THREAD)
0837         {
0838             NDRX_LOG(log_warn, "tpcontinue() issued from integra (no longjmp)!");
0839         }
0840         else
0841         {
0842             NDRX_LOG(log_warn, "No return from service!");
0843             
0844             if (tpgetlev() && last_call->sysflags & SYS_FLAG_AUTOTRAN)
0845             {
0846                 NDRX_LOG(log_error, "ERROR: Auto-tran started [%s], but no tpreturn() - ABORTING...", 
0847                         G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0848                 userlog("ERROR: Auto-tran started [%s], but no tpreturn() - ABORTING...", 
0849                         G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0850                 
0851                 if (EXSUCCEED!=ndrx_tpabort(0, EXTRUE))
0852                 {
0853                     NDRX_LOG(log_error, "Auto abort failed: %s", tpstrerror(tperrno));
0854                     userlog("Auto abort failed: %s", tpstrerror(tperrno));
0855                 }
0856             }
0857             
0858             /* force close queues and remove us from conv... */
0859             normal_connection_shutdown(ndrx_get_G_accepted_connection(), EXFALSE, 
0860                     "missing tpreturn, forced cleanup");
0861             
0862             if (!(svcinfo.flags & TPNOREPLY))
0863             {
0864                 /* if we are here, then there was no reply! */
0865                 NDRX_LOG(log_error, "PROTO error - no reply from service [%s]",
0866                                                 call->name);
0867                 /* reply with failure back */
0868                 *status=EXFAIL;
0869             }
0870         }
0871     }
0872     else
0873     {
0874         NDRX_LOG(log_debug, "Got back from reply/forward (%d)",
0875                                         reply_type);
0876         
0877         if (reply_type & RETURN_FAILED || reply_type & RETURN_SVC_FAIL)
0878         {
0879             *status=EXFAIL;
0880         }
0881         
0882     }
0883     
0884 out:
0885 
0886     /* reply with error if needed */
0887     if (generate_rply)
0888     {
0889         ndrx_reject_connection(error_code);
0890     }
0891 
0892     /* free_up_buffers(); - processes manages memory manually!!! */
0893 /*
0894     mvitolin 26/01/2017 - let the tpreturn free the auto buffer
0895     if (NULL!=request_buffer)
0896     {
0897         _tpfree(request_buffer, NULL);
0898     }
0899 */
0900     return ret;
0901 }
0902 
0903 /**
0904  * Decode received request & do the operation
0905  * @param call_buf call buffer
0906  * @param call_len call buffer len
0907  * @param call_no call service number
0908  * @return
0909  */
0910 expublic int sv_server_request(char **call_buf, long call_len, int call_no)
0911 {
0912     int ret=EXSUCCEED;
0913     tp_command_generic_t *gen_command = (tp_command_generic_t *)*call_buf;
0914     ndrx_stopwatch_t timer;
0915     /* take time */
0916     ndrx_stopwatch_reset(&timer);
0917     int service = EXFAIL;
0918     int status;
0919     unsigned result;
0920     
0921     /*if we are bridge, then no more processing required!*/
0922     if (G_server_conf.flags & SRV_KEY_FLAGS_BRIDGE)
0923     {
0924         if (NULL!=G_server_conf.p_qmsg)
0925         {
0926             if (EXSUCCEED!=G_server_conf.p_qmsg(call_buf, call_len, BR_NET_CALL_MSG_TYPE_ATMI))
0927             {
0928                 NDRX_LOG(log_error, "Failed to process ATMI request on bridge!");
0929                 EXFAIL_OUT(ret);
0930             }
0931         }
0932         else
0933         {
0934             NDRX_LOG(log_error, "This is no p_qmsg for bridge!");
0935             
0936         }
0937         /* go out, nothing to do new... */
0938         goto out;
0939     }
0940     
0941 
0942     NDRX_LOG(log_debug, "Got command: %hd", gen_command->command_id);
0943 
0944     if (G_shm_srv)
0945     {
0946         if (G_server_conf.is_threaded)
0947         {
0948             NDRX_SPIN_LOCK_V(G_server_conf.mt_lock);
0949             G_shm_srv->status++;
0950             G_shm_srv->last_command_id = gen_command->command_id;
0951             NDRX_SPIN_UNLOCK_V(G_server_conf.mt_lock);
0952         }
0953         else
0954         {
0955             G_shm_srv->status = NDRXD_SVC_STATUS_BUSY;
0956             G_shm_srv->last_command_id = gen_command->command_id;
0957         }
0958     }
0959 
0960     switch (gen_command->command_id)
0961     {
0962         case ATMI_COMMAND_TPCALL:
0963 
0964             ret=sv_serve_call(&service, &status, call_buf, call_len, call_no);
0965 
0966             break;
0967         case ATMI_COMMAND_CONNECT:
0968             /* We have connection for conversation */
0969             ret=sv_serve_connect(&service, &status, call_buf, call_len, call_no);
0970             break;
0971         case ATMI_COMMAND_SELF_SD:
0972             
0973             G_shutdown_nr_got++;
0974             
0975             NDRX_LOG(log_warn, "Got shutdown req %d of %d", 
0976                     G_shutdown_nr_got, G_shutdown_nr_wait);
0977             goto out;
0978             
0979             break;
0980         case ATMI_COMMAND_CONNRPLY:
0981             {
0982                 tp_command_call_t *call = (tp_command_call_t*)*call_buf;
0983 
0984                 NDRX_LOG(log_error, "Dropping unsolicited/event reply "
0985                                     "cd: %d callseq: %u timestamp: %d",
0986                                     call->cd, call->callseq, call->timestamp);
0987 
0988                 userlog("Dropping unsolicited/event reply "
0989                                         "cd: %d callseq: %u timestamp: %d",
0990                                         call->cd, call->callseq, call->timestamp);
0991                 /* Register as completed (if not cancelled) */
0992                 cancel_if_expected(call);
0993             }
0994             break;
0995         case ATMI_COMMAND_TPREPLY:
0996             {
0997                 tp_command_call_t *call = (tp_command_call_t*)*call_buf;
0998                 NDRX_LOG(log_error, "Dropping unsolicited reply "
0999                                     "cd: %d callseq: %u timestamp: %d",
1000                                      call->cd, call->callseq, call->timestamp);
1001                 
1002                 NDRX_DUMP(log_error, "Command content", *call_buf, call_len);
1003                 userlog("Dropping unsolicited reply "
1004                                     "cd: %d callseq: %u timestamp: %d",
1005                                     call->cd, call->callseq, call->timestamp);
1006                 ndrx_dump_call_struct(log_error, call);
1007             }
1008             break;
1009         case ATMI_COMMAND_TPNOTIFY:
1010         case ATMI_COMMAND_BROADCAST:
1011             {
1012                 /* Got broadcast message, just use ATMI lib internal
1013                  * dispatcher...
1014                  */
1015                 tp_notif_call_t *notif = (tp_notif_call_t*)*call_buf;
1016                 char *request_buffer = NULL;
1017                 long req_len = 0;
1018                 
1019                 NDRX_LOG(log_info, "Doing local %s...",
1020                      (ATMI_COMMAND_TPNOTIFY==gen_command->command_id?"tpnotify":"tpbroadcast"));
1021                 
1022                 /* How about prepare incoming buffer? */
1023                 
1024                 if (EXSUCCEED==ndrx_mbuf_prepare_incoming(notif->data,
1025                         notif->data_len,
1026                         &request_buffer,
1027                         &req_len,
1028                         0L, 0L)    
1029                         )
1030                 {
1031                     NDRX_LOG(log_debug, "ATMI Command id: %d", 
1032                             gen_command->command_id);
1033                     if (ATMI_COMMAND_TPNOTIFY==gen_command->command_id)
1034                     {
1035                         TPMYID myid;
1036                         CLIENTID *clt;
1037                     
1038                         /* NDRX_LOG(log_debug, "Doing notify..."); */
1039                         clt = (CLIENTID *)notif->destclient; /* same char arr.. */
1040                         
1041                         if (EXSUCCEED!=ndrx_myid_parse(notif->destclient, &myid, EXFALSE))
1042                         {
1043                             NDRX_LOG(log_error, "Failed to parse client: [%s]",
1044                                     notif->destclient);
1045                             EXFAIL_OUT(ret);
1046                         }
1047                         
1048                         ret=ndrx_tpnotify((CLIENTID *)notif->destclient, /* basically clientid */
1049                                 &myid, 
1050                                 NULL, 
1051                                 request_buffer, 
1052                                 req_len, 
1053                                 notif->flags,
1054                                 myid.nodeid,
1055                                 (notif->nodeid_isnull?NULL:notif->nodeid),
1056                                 (notif->usrname_isnull?NULL:notif->usrname),
1057                                 (notif->cltname_isnull?NULL:notif->cltname),
1058                                  0L);
1059                     }
1060                     else
1061                     {
1062                         NDRX_LOG(log_debug, "Doing tpbroadcast... flags = %ld, is regexp=%d", 
1063                                 notif->flags, notif->flags&TPREGEXMATCH);
1064                         
1065                         NDRX_LOG(log_debug, "notif->nodeid_isnull: %d (%s)", 
1066                                 notif->nodeid_isnull, notif->nodeid);
1067                         NDRX_LOG(log_debug, "notif->usrname_isnull: %d (%s)", 
1068                                 notif->usrname_isnull, notif->usrname);
1069                         NDRX_LOG(log_debug, "notif->cltname_isnull: %d (%s)", 
1070                                 notif->cltname_isnull, notif->cltname);
1071                         
1072                         ret=ndrx_tpbroadcast_local((notif->nodeid_isnull?NULL:notif->nodeid),
1073                                 (notif->usrname_isnull?NULL:notif->usrname),
1074                                 (notif->cltname_isnull?NULL:notif->cltname),
1075                                 request_buffer, req_len, notif->flags, EXTRUE);
1076                     }
1077                     
1078                     if (NULL!=request_buffer)
1079                     {
1080                         tpfree(request_buffer);
1081                     }
1082                     
1083                     if (EXSUCCEED!=ret)
1084                     {
1085                         NDRX_LOG(log_error, "Local notification/broadcast failed");
1086                     }
1087                 }
1088                 
1089             }
1090             break;
1091         default:
1092             NDRX_LOG(log_error, "Unknown command ID: %hd", gen_command->command_id);
1093             
1094             /* Dump the message to log... */
1095             NDRX_DUMP(log_error, "Command content", *call_buf,  call_len);
1096             
1097             EXFAIL_OUT(ret);
1098             break;
1099     }
1100 
1101     result = ndrx_stopwatch_get_delta(&timer);
1102     
1103     /* Update stats, if ptr available */
1104     if (EXFAIL!=service && G_shm_srv)
1105     {
1106         if (G_server_conf.is_threaded)
1107         {
1108             NDRX_SPIN_LOCK_V(G_server_conf.mt_lock);
1109         }
1110         
1111         /* reset back to avail. */
1112         if (G_server_conf.is_threaded)
1113         {
1114             G_shm_srv->svc_status[service]--;
1115             G_shm_srv->status--;
1116         }
1117         else
1118         {
1119             G_shm_srv->svc_status[service] = NDRXD_SVC_STATUS_AVAIL;
1120             G_shm_srv->status = NDRXD_SVC_STATUS_AVAIL;
1121         }
1122 
1123         /* update timing */
1124         /* min, if this is first time, then update directly */
1125         if (0==G_shm_srv->svc_succeed[service] && 0==G_shm_srv->svc_fail[service])
1126         {
1127             G_shm_srv->min_rsp_msec[service]=result;
1128         }
1129         else if (result<G_shm_srv->min_rsp_msec[service])
1130         {
1131             G_shm_srv->min_rsp_msec[service]=result;
1132         }
1133         
1134         /* max */
1135         if (result>G_shm_srv->max_rsp_msec[service])
1136         {
1137             G_shm_srv->max_rsp_msec[service]=result;
1138         }
1139         
1140         G_shm_srv->last_rsp_msec[service]=result;
1141 
1142         if (status==EXSUCCEED)
1143         {
1144             /* Loop over to zero */
1145             if (INT_MAX==G_shm_srv->svc_succeed[service])
1146             {
1147                 G_shm_srv->svc_succeed[service] = 0;
1148             }
1149             
1150             G_shm_srv->svc_succeed[service]++;
1151             
1152         }
1153         else
1154         {
1155             /* Loop over to zero */
1156             if (INT_MAX==G_shm_srv->svc_fail[service])
1157             {
1158                 G_shm_srv->svc_fail[service] = 0;
1159             }
1160             
1161             G_shm_srv->svc_fail[service]++;
1162            
1163         }
1164         
1165         if (G_server_conf.is_threaded)
1166         {
1167             NDRX_SPIN_UNLOCK_V(G_server_conf.mt_lock);
1168         }
1169         
1170         if (status!=EXSUCCEED)
1171         {
1172             /* If we are in global transaction,
1173              * then we shall notify the master RM of failure
1174              * or this will be done by caller. The master buffer will be marked
1175              * as abort only, because response is not ok.
1176              * 
1177              * But if we have called with tpacall() and no reply needed.
1178              * And we fail, then we need to mark the transaction as bad...
1179              * RM should know that....
1180              */
1181             
1182             /* TODO: But note... We might still need the transaction flags
1183              * Maybe we already know that transaction is abort only...!
1184              *  */
1185             _tp_srv_tell_tx_fail();
1186         }
1187         
1188         /* If we were in global tx, then we have to disassociate us from tx...
1189          * this is done in return/forward stmt.
1190          * but check again if we did return with out return?
1191          */
1192         if (ndrx_get_G_atmi_xa_curtx()->txinfo)
1193         {
1194             int end_fail=EXFALSE;
1195             _tp_srv_disassoc_tx(EXTRUE, &end_fail);
1196         }
1197     }
1198 
1199 out:
1200     return ret;
1201 }
1202 
1203 /**
1204  * Multi-threaded dispatch thread entry, extract thread data and
1205  * run off the standard sv_server_request
1206  * @param ptr data ptr
1207  * @param p_finish_off stop?
1208  * @return EXSUCCEED
1209  */
1210 expublic int sv_server_request_th(void *ptr, int *p_finish_off)
1211 {
1212     int ret;
1213     thread_dispatch_t *work = (thread_dispatch_t *)ptr;
1214     
1215     NDRX_LOG(log_debug, "Dispatch thread got: %ld", work->call_len);
1216     ret=sv_server_request(&work->call_buf, work->call_len, work->call_no);
1217     
1218     if (NULL!=work->call_buf)
1219     {
1220         NDRX_SYSBUF_FREE(work->call_buf);
1221     }
1222     
1223     NDRX_FPFREE(work);
1224     
1225     return ret;
1226 }
1227 
1228 /**
1229  * Perform shutdown call, only from main thread
1230  * @param requester debug string of who requested the shutdown
1231  * @param shutdown_req ptr to global indicating the down...
1232  */
1233 expublic void ndrx_sv_do_shutdown(char *requester, int *shutdown_req)
1234 {
1235     int i;
1236     NDRX_LOG(log_info, "Shutdown processed by [%s]", requester);
1237     tp_command_generic_t shut_msg; /* shutdown msg */
1238     
1239     *shutdown_req=EXTRUE;
1240     
1241 #ifdef EX_USE_POLL
1242     /* TODO: We shall send request to all open service queues
1243      * to do the shutdown. This is only for poll() mode.
1244      */
1245     memset(&shut_msg, 0, sizeof(shut_msg));
1246 
1247     shut_msg.command_id = ATMI_COMMAND_SELF_SD;
1248 
1249     /* Send over all open service queues: */
1250 
1251     for (i=ATMI_SRV_Q_ADJUST; i<G_server_conf.adv_service_count; i++)
1252     {
1253         if (EXSUCCEED!=ndrx_generic_qfd_send(G_server_conf.service_array[i]->q_descr, 
1254                 (char *)&shut_msg, sizeof(shut_msg), 0))
1255         {
1256             NDRX_LOG(log_debug, "Failed to send self notification to %s q",
1257                     G_server_conf.service_array[i]->listen_q);
1258         }
1259         else
1260         {
1261             G_shutdown_nr_wait++;
1262         }
1263     }/* for */
1264 
1265     NDRX_LOG(log_warn, "Send %d self notifications to "
1266             "service queues for shutdown...", G_shutdown_nr_wait);
1267 #endif
1268 }
1269 
1270 /**
1271  * Process admin request
1272  * @param buf
1273  * @param len
1274  * @param shutdown_req
1275  * @return SUCCEED/FAIL
1276  */
1277 expublic int process_admin_req(char **buf, long len, int *shutdown_req)
1278 {
1279     int ret=EXSUCCEED;
1280     
1281     command_call_t * call = (command_call_t *)*buf;
1282 
1283     /* So what, do shutdown, right? */
1284     if (NDRXD_COM_SRVSTOP_RQ==call->command)
1285     {
1286         NDRX_LOG(log_info, "Shutdown requested by [%s]", 
1287                                         call->reply_queue);
1288         if (NULL!=G_server_conf.p_shutdowncb)
1289         {
1290             G_server_conf.p_shutdowncb(shutdown_req);
1291         }
1292         else
1293         {
1294             ndrx_sv_do_shutdown("direct call", shutdown_req);
1295         }
1296     }
1297     else if (NDRXD_COM_SRVINFO_RQ==call->command)
1298     {
1299         NDRX_LOG(log_warn, "Server info requested by [%s]",
1300                                         call->reply_queue);
1301         /* Send details to ndrxd */
1302         report_to_ndrxd();
1303     }
1304     else if (NDRXD_COM_NXDUNADV_RQ==call->command)
1305     {
1306         command_dynadvertise_t *call_srv = (command_dynadvertise_t *)call;
1307         
1308         NDRX_LOG(log_warn, "Server requested unadvertise service [%s] by [%s]",
1309                                         call_srv->svc_nm, call->reply_queue);
1310         
1311         /*
1312          * There are too much service structures to protect, thus better
1313          * do not support dynamic advertise (for now...)
1314          */
1315         if (G_server_conf.is_threaded)
1316         {
1317             NDRX_LOG(log_error, "Got command from ndrxd: %d - ndrxd unadvertise (svcnm=[%s]), "
1318                     "but this MT server, unsupported - ignore", 
1319                     call->command, call_srv->svc_nm);
1320             
1321             userlog("Got command from ndrxd: %d - ndrxd unadvertise (svcnm=[%s]), "
1322                     "but this MT server, unsupported - ignore", 
1323                     call->command, call_srv->svc_nm);
1324         }
1325         else
1326         {
1327             /* Send details to ndrxd */
1328             dynamic_unadvertise(call_srv->svc_nm, NULL, NULL);
1329         }
1330     }
1331     else if (NDRXD_COM_NXDREADV_RQ==call->command)
1332     {
1333         command_dynadvertise_t *call_srv = (command_dynadvertise_t *)call;
1334         
1335         NDRX_LOG(log_warn, "Server requested readvertise service [%s] by [%s]",
1336                                         call_srv->svc_nm, call->reply_queue);
1337         
1338         /*
1339          * There are too much service structures to protect, thus better
1340          * do not support dynamic advertise (for now...)
1341          */
1342         if (G_server_conf.is_threaded)
1343         {
1344             NDRX_LOG(log_error, "Got command from ndrxd: %d - ndrxd re-advertise (svcnm=[%s]), "
1345                     "but this MT server, unsupported - ignore", 
1346                     call->command, call_srv->svc_nm);
1347             
1348             userlog("Got command from ndrxd: %d - ndrxd re-advertise (svcnm=[%s]), "
1349                     "but this MT server, unsupported - ignore", 
1350                     call->command, call_srv->svc_nm);
1351         }
1352         else
1353         {
1354             dynamic_readvertise(call_srv->svc_nm);
1355         }
1356     }
1357     else if (NDRXD_COM_SRVPING_RQ==call->command)
1358     {
1359         command_srvping_t *call_srv = (command_srvping_t *)call;
1360         if (call_srv->srvid == G_server_conf.srv_id)
1361         {
1362             NDRX_LOG(log_debug, "Got ping request: %d seq", 
1363                                                 call_srv->seq);
1364             pingrsp_to_ndrxd(call_srv);
1365         }
1366     }
1367     else
1368     {
1369         /*if we are bridge, then no more processing required!*/
1370         if (G_server_conf.flags & SRV_KEY_FLAGS_BRIDGE)
1371         {
1372             if (NULL!=G_server_conf.p_qmsg)
1373             {
1374                 if (EXSUCCEED!=G_server_conf.p_qmsg(buf, len, BR_NET_CALL_MSG_TYPE_NDRXD))
1375                 {
1376                     NDRX_LOG(log_error, "Failed to process ATMI request on bridge!");
1377                     EXFAIL_OUT(ret);
1378                 }
1379             }
1380             else
1381             {
1382                 NDRX_LOG(log_error, "This is no p_qmsg for brdige!");
1383                 goto out;
1384             }
1385         }
1386     }
1387     
1388 out:
1389     return ret;
1390 }
1391 
1392 /**
1393  * Enter server in waiting state
1394  * @return
1395  */
1396 expublic int sv_wait_for_request(void)
1397 {
1398     int ret=EXSUCCEED;
1399     int nfds, n, len, j, call_no;
1400     unsigned prio;
1401     int again;
1402     int tout;
1403     pollextension_rec_t *ext;
1404     int evfd;
1405     mqd_t evmqd;
1406     ndrx_stopwatch_t   dbg_time;   /* Generally this is used for debug. */
1407     ndrx_stopwatch_t   periodic_cb;
1408     char *msg_buf = NULL;
1409     size_t msgsize_max = NDRX_MSGSIZEMAX;
1410     
1411     
1412     ndrx_stopwatch_reset(&dbg_time);
1413     ndrx_stopwatch_reset(&periodic_cb);
1414     
1415     /* THIS IS MAIN SERVER LOOP! */
1416     while(EXSUCCEED==ret && (!G_shutdown_req /*|| 
1417             if shutdown request then wait for all queued jobs to finish. 
1418             G_shutdown_nr_got <  G_shutdown_nr_wait - why? */))
1419     {
1420         /* moved here as tout might change by callback api */
1421         if (G_server_conf.periodcb_sec)
1422         {
1423             tout = G_server_conf.periodcb_sec*1000
1424                 - ndrx_stopwatch_get_delta(&periodic_cb);
1425 
1426             if (tout<0)
1427             {
1428                 tout=0;
1429             }
1430 
1431         }
1432         else
1433         {
1434             tout=EXFAIL; /* Timeout disabled */
1435         }
1436         
1437         /* Support for periodical invocation of custom function! */   
1438         /* Invoke before poll function */
1439         if (G_server_conf.p_b4pollcb
1440                 && EXSUCCEED!=G_server_conf.p_b4pollcb())
1441         {
1442             ret=EXFAIL;
1443             goto out;
1444         }
1445         
1446         /* We want this debug but if using callbacks, then it might get too many in trace files
1447          * so we just put timer there if tout used, and no timer if no tout use.
1448          * If tout used, then 60 sec will be ok for dbug
1449          */
1450         if (EXFAIL==tout || ndrx_stopwatch_get_delta_sec(&dbg_time) >= 60)
1451         {
1452             NDRX_LOG(log_debug, "About to poll - timeout=%d millisec", 
1453                                                 tout);
1454             if (EXFAIL!=tout)
1455             {
1456                 ndrx_stopwatch_reset(&dbg_time);
1457             }
1458         }
1459         
1460         /* some epoll backends can return already buffer received
1461          * for example System V Queues
1462          * for others it is just -1
1463          * 
1464          * So the plan is following for SystemV:
1465          * - first service Q is open as real queue and main dispatcher
1466          * - the other queues are just open as some dummy pointers with
1467          *  service name recorded inside and we return it as mqd_t
1468          *  so that pointers can be compared
1469          * - when epoll_wait gets the message, we trace down the pointer
1470          *  by the service name in the message and then we return then
1471          *  we will events correspondingly.
1472          */
1473         /*len = sizeof(msg_buf);*/
1474         
1475         if (NULL==msg_buf)
1476         {
1477             NDRX_SYSBUF_MALLOC_WERR_OUT(msg_buf, len, ret);
1478         }
1479         else
1480         {
1481             len = msgsize_max;
1482         }
1483         
1484         if (0==tout)
1485         {
1486             /* process the period callback... */
1487             nfds=EXFAIL;
1488         }
1489         else
1490         {
1491             /* wait for messages or timeout... */
1492             nfds = ndrx_epoll_wait(G_server_conf.epollfd, G_server_conf.events, 
1493                 G_server_conf.max_events, tout, &msg_buf, &len);
1494 
1495             /* Print stuff if there is no timeout set or there is some value out there */
1496             NDRX_LOG(log_debug, "Poll says: %d len: %d tout: %d", nfds, len, tout);
1497         }
1498         
1499         /* If there are zero FDs &  */
1500         if (EXFAIL==nfds && 0!=tout)
1501         {
1502             int err = errno;
1503             ndrx_TPset_error_fmt(TPEOS, "epoll_pwait failed: %s", 
1504                     ndrx_poll_strerror(ndrx_epoll_errno()));
1505             
1506             if (EINTR==err)
1507             {
1508                 continue;
1509             }
1510             
1511             EXFAIL_OUT(ret);
1512         }
1513 
1514         /* We should use timer here because, if there are service requests at
1515          * constant time (less than poll time), then callback will be never called! 
1516          * perform stopwatch only when timed out, otherwise 
1517          */
1518         else if (EXFAIL!=tout && 
1519                 ndrx_stopwatch_get_delta_sec(&periodic_cb) >= G_server_conf.periodcb_sec)
1520         {
1521             if (NULL!=G_server_conf.p_periodcb &&
1522                     EXSUCCEED!=(ret=G_server_conf.p_periodcb()))
1523             {
1524                 NDRX_LOG(log_error, "Periodical callback function "
1525                         "failed!!! With ret=%d", ret);
1526                 goto out;
1527             }
1528             
1529             ndrx_stopwatch_reset(&periodic_cb);
1530         }
1531         
1532         /*
1533          * TODO: We should have algorithm which checks request in round robin way.
1534          * So that if there is big load for first service, and there is outstanding calls to second,
1535          * then we should process also the second.
1536          * Maybe not? Maybe kernel already does that?
1537          */
1538         for (n = 0; n < nfds; n++)
1539         {
1540             int is_mq_only = EXFAIL;
1541             evfd = G_server_conf.events[n].data.fd;
1542             evmqd = G_server_conf.events[n].data.mqd;
1543 
1544 /* so this way will work for POLL & Kqueue */
1545 #if !defined(EX_USE_FDPOLL) && !defined(EX_USE_EPOLL)
1546             NDRX_LOG(log_debug, "not epoll()");
1547             /* for non linux, we need to distinguish between fd & mq */
1548             is_mq_only = G_server_conf.events[n].is_mqd;
1549 #endif
1550 
1551             NDRX_LOG(log_debug, "Receiving %d, user data: %d, fd: %d, evmqd: %d, "
1552                     "is_mq_only: %d, G_pollext=%p",
1553                     n, G_server_conf.events[n].data.u32, evfd, evmqd, 
1554                     is_mq_only, ndrx_G_pollext);
1555             
1556             if (0==evfd && 0==evmqd)
1557             {
1558                 /* not sure why epoll returns these 0 */
1559                 continue;
1560             }
1561             
1562             /* Check poller extension */
1563             if (NULL!=ndrx_G_pollext && (EXFAIL==is_mq_only || EXFALSE==is_mq_only) )
1564             {
1565                 ext=ext_find_poller(evfd);
1566                 
1567                 if (NULL!=ext)
1568                 {
1569                     NDRX_LOG(log_info, "FD found in extension list, invoking");
1570                     
1571                     ret = ext->p_pollevent(evfd, G_server_conf.events[n].events, ext->ptr1);
1572                     if (EXSUCCEED!=ret)
1573                     {
1574                         NDRX_LOG(log_error, "p_pollevent at 0x%lx failed!!!",
1575                                 ext->p_pollevent);
1576                         goto out;
1577                     }
1578                     else
1579                     {
1580                         continue;
1581                     }
1582                 }
1583             }
1584             
1585             /* ignore fds  */
1586             if (EXFALSE==is_mq_only)
1587             {
1588                 continue;
1589             }
1590             
1591             if (EXFAIL==len && EXFAIL==(len=ndrx_mq_receive (evmqd,
1592                 (char *)msg_buf, msgsize_max, &prio)))
1593             {
1594                 if (EAGAIN==errno)
1595                 {
1596                     /* In this case we ignore the error and try on next q
1597                      * This may happen in cases when in load balance mode
1598                      * someone else took the message off.
1599                      */
1600                     NDRX_LOG(log_debug, "EAGAIN");
1601                     continue; /* try to get next message */
1602                 }
1603                 else
1604                 {
1605                     ret=EXFAIL;
1606                     ndrx_TPset_error_fmt(TPEOS, "ndrx_mq_receive failed: %s", 
1607                             strerror(errno));
1608                 }
1609             }
1610             else
1611             {   
1612                 /* OK, we got the message and now we can call the service */
1613                 /*G_server_conf.service_array[n]->p_func((TPSVCINFO *)msg_buf);*/
1614 
1615                 /* We generally here will ingore returned error code! */
1616 
1617                 /* TODO: We could use hashtable to lookup for lookup*/
1618                 /* figure out target structure */
1619                 call_no=EXFAIL;
1620                 for (j=0; j<G_server_conf.adv_service_count; j++)
1621                 {
1622                     if (evmqd==G_server_conf.service_array[j]->q_descr)
1623                     {
1624                         call_no = j;
1625                         break;
1626                     }
1627                 }
1628                 
1629                 NDRX_LOG(log_debug, "Got request on logical channel %d, fd: %d",
1630                             call_no, evmqd);
1631 
1632                 if (ATMI_SRV_ADMIN_Q==call_no)
1633                 {
1634                     NDRX_LOG(log_debug, "Got admin request");
1635                     ret=process_admin_req(&msg_buf, len, &G_shutdown_req);
1636                 }
1637                 else
1638                 {   
1639                     /* This normally should not happen! */
1640                     if (EXFAIL==call_no)
1641                     {
1642                         ndrx_TPset_error_fmt(TPESYSTEM, "No service entry for "
1643                                 "call descriptor %d", evmqd);
1644                         ret=EXFAIL;
1645                         goto out;
1646                     }
1647                     
1648                     if (!G_server_conf.is_threaded)
1649                     {
1650                         /* Save on the big message copy... */
1651                         /* in case of MINDISPATCHTHREADS, buf needs to be dynamic
1652                          * allocated (and re-used if one threaded used)
1653                          * if using > 1 thread, then sv_server_request shall be
1654                          * processed by thread_pool and reset the msg_buf to NULL
1655                          * so that we allocate new one.
1656                          */
1657                         sv_server_request(&msg_buf, len, call_no);
1658                     }
1659                     else
1660                     {
1661                         thread_dispatch_t *work;
1662                         
1663                         work = NDRX_FPMALLOC(sizeof(thread_dispatch_t), 0);
1664 
1665                         if (NULL==work)
1666                         {
1667                             int err = errno;
1668                             NDRX_LOG(log_error, "Failed to allocate thread_dispatch_t: %s", 
1669                                     strerror(err));
1670                             userlog("Failed to allocate thread_dispatch_t: %s", 
1671                                     strerror(err));
1672                             EXFAIL_OUT(ret);
1673                         }
1674                         
1675                         work->call_buf=msg_buf;
1676                         msg_buf=NULL;
1677                         work->call_len = len;
1678                         work->call_no = call_no;
1679                         
1680                         /* forward to dispatch thread */
1681                         NDRX_LOG(log_debug, "Dispatching to worker thread... %d", len);
1682                         if (EXSUCCEED!=ndrx_thpool_add_work(G_server_conf.dispthreads, 
1683                             (void*)sv_server_request_th, 
1684                             (void *)work))
1685                         {
1686                             EXFAIL_OUT(ret);
1687                         }
1688                         
1689                         /* wait for one free slot before continue with next 
1690                          * so that we do not consume all the messages in the
1691                          * job queue, instead leave them in system queues
1692                          */
1693                         ndrx_thpool_wait_one(G_server_conf.dispthreads);
1694                     }
1695                 } /* if normal request */
1696             }
1697         } /* for */
1698     }
1699 out:
1700 
1701     /* free up system buffer, if not re-used */
1702     if (NULL!=msg_buf)
1703     {
1704         NDRX_SYSBUF_FREE(msg_buf);
1705     }
1706 
1707     return ret;
1708 }
1709 /* vim: set ts=4 sw=4 et smartindent: */