Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief API to NDRXD admin server
0003  *   TODO: Might think to convert all ndrxd API operations to adminQ!
0004  *   Due to possible collisions with async replies in Q....
0005  *
0006  * @file ndrxdapi.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <string.h>
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <memory.h>
0040 
0041 #include <ndrstandard.h>
0042 #include <ndebug.h>
0043 
0044 #include "srv_int.h"
0045 #include "tperror.h"
0046 #include "userlog.h"
0047 #include <atmi_int.h>
0048 #include <ndrxdcmn.h>
0049 #include <unistd.h>
0050 #include <atmi.h>
0051 /*---------------------------Externs------------------------------------*/
0052 /*---------------------------Macros-------------------------------------*/
0053 /*---------------------------Enums--------------------------------------*/
0054 /*---------------------------Typedefs-----------------------------------*/
0055 /*---------------------------Globals------------------------------------*/
0056 /* We should have monitor timer for long not-receiving ndrxd reply
0057  * but receiving tons of event postages...*/
0058 ndrx_stopwatch_t M_getbrs_timer;
0059 
0060 int (*G_report_to_ndrxd_cb) (void) = NULL;
0061 
0062 /*---------------------------Statics------------------------------------*/
0063 /*---------------------------Prototypes---------------------------------*/
0064 
0065 /**
0066  * Install callback function which will be additionally called when 
0067  * libatmisrv is reporting service status to ndrxd.
0068  * @param report_to_ndrxd_callback - callback func. Can be NULL, to disable cback.
0069  */
0070 expublic void ndrx_set_report_to_ndrxd_cb(int (*report_to_ndrxd_cb) (void))
0071 {
0072     NDRX_LOG(log_warn, "Installing additional report_to_ndrxd() callback = %p", 
0073             report_to_ndrxd_cb);
0074     G_report_to_ndrxd_cb = report_to_ndrxd_cb;
0075 }
0076 
0077 /**
0078  * Report to ndrxd
0079  * TODO: Test recovery case, it might not report to ndrxd that process is bridge!
0080  * @return
0081  */
0082 expublic int report_to_ndrxd(void)
0083 {
0084     int ret=EXSUCCEED;
0085     char *buf = NULL;
0086     size_t buf_len;
0087     srv_status_t *status;
0088     int i, offset=0;
0089     svc_entry_fn_t *entry;
0090     size_t  send_size;
0091     static int first = EXTRUE;
0092     static int ppid = EXFAIL;
0093     char *p;
0094     
0095     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0096     
0097     status = (srv_status_t *)buf;
0098     /* shall we do full memset? */
0099     memset(buf, 0, sizeof(srv_status_t));
0100     
0101     /* format out the status report */
0102     
0103     /* Feature #76, provide parent (script?) pid from env variables if available
0104      * if not available, then assume that current pid is a server process pid
0105      * and there are no wrappers used.
0106      * it is expected that multi-threaded calls will not be made here.
0107      * and evne if multi-thread is done, the worst thing that might happen
0108      * is reading env twice and writting ppid twice - no problem at all.
0109      */
0110     if (first)
0111     {
0112         p = getenv(CONF_NDRX_SVPPID);
0113         
0114         if (NULL!=p)
0115         {
0116             ppid = atoi(p);
0117         }
0118         
0119         if (ppid <= 0)
0120         {
0121             ppid = getpid();
0122         }
0123         first = EXFALSE;
0124     }
0125     
0126     status->srvinfo.pid = ppid;
0127     status->srvinfo.svpid = getpid();
0128     
0129     /* TODO: We need to add rqaddr / qid to the status message
0130      * so that ndrxd can install the qid in service shared
0131      * memory
0132      */
0133     status->srvinfo.state = NDRXD_PM_RUNNING_OK;
0134     status->srvinfo.srvid = G_server_conf.srv_id;
0135     status->srvinfo.flags = G_server_conf.flags;
0136     status->srvinfo.nodeid = G_server_conf.nodeid;
0137     status->srvinfo.procgrp_lp_no = G_server_conf.procgrp_lp_no;
0138     NDRX_STRCPY_SAFE(status->srvinfo.binary_name_real, G_server_conf.binary_name);
0139     NDRX_STRCPY_SAFE(status->srvinfo.rqaddress, G_server_conf.rqaddress);
0140 #ifdef EX_USE_SYSVQ
0141     status->srvinfo.resid = ndrx_epoll_resid_get();
0142 #else
0143     status->srvinfo.resid = G_server_conf.srv_id;
0144 #endif
0145 
0146     /* fill the service list */
0147     for (i=0; i<G_server_conf.adv_service_count; i++)
0148     {
0149         entry = G_server_conf.service_array[i];
0150         /* TODO: Still some admin stuff gets over? */
0151         if (entry->is_admin || EXEOS==entry->svc_nm[0])
0152         {
0153             offset++;
0154             continue; /* not interested in admin q */
0155         }
0156         NDRX_STRCPY_SAFE(status->svcs[i-offset].svc_nm, entry->svc_nm);
0157         NDRX_STRCPY_SAFE(status->svcs[i-offset].fn_nm, entry->fn_nm);
0158         status->svcs[i-offset].qopen_time = entry->qopen_time;
0159         status->svc_count++;
0160 
0161     }
0162 
0163     send_size = sizeof(srv_status_t)+sizeof(svc_inf_t)*status->svc_count;
0164     NDRX_LOG(log_debug, "About to send: %d bytes/%d svcs, srvid: %d",
0165                         send_size, status->svc_count, status->srvinfo.srvid);
0166 
0167     ret=cmd_generic_call(NDRXD_COM_SVCINFO_RQ, NDRXD_SRC_SERVER,
0168                         NDRXD_CALL_TYPE_PM_INFO,
0169                         (command_call_t *)status, send_size,
0170                         ndrx_get_G_atmi_conf()->reply_q_str,
0171                         ndrx_get_G_atmi_conf()->reply_q,
0172                         (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0173                         ndrx_get_G_atmi_conf()->ndrxd_q_str,
0174                         0, NULL,
0175                         NULL,
0176                         NULL,
0177                         NULL,
0178                         EXFALSE);
0179     /* Bug #110 - provide bridge status after ndrxd recovery... */
0180     if (EXSUCCEED==ret && NULL!=G_report_to_ndrxd_cb)
0181     {
0182         NDRX_LOG(log_info, "Callback on report_to_ndrxd is set.");
0183         ret=G_report_to_ndrxd_cb();
0184     }
0185 out:
0186     
0187     if (NULL!=buf)
0188     {
0189         NDRX_SYSBUF_FREE(buf);
0190     }
0191 
0192     return ret;
0193 }
0194 
0195 
0196 /**
0197  * Send unsubscribe message to ndrxd
0198  * @return
0199  */
0200 expublic int unadvertse_to_ndrxd(char *svcname)
0201 {
0202     int ret=EXSUCCEED;
0203     char *buf = NULL;
0204     size_t buf_len;
0205     command_dynadvertise_t *unadv;
0206     size_t  send_size=sizeof(command_dynadvertise_t);
0207 
0208     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0209     
0210     memset(buf, 0, sizeof(command_dynadvertise_t));
0211     
0212     unadv = (command_dynadvertise_t *)buf;
0213     
0214     /* format out the status report */
0215     unadv->srvid= G_server_conf.srv_id;
0216     NDRX_STRCPY_SAFE(unadv->svc_nm, svcname);
0217     
0218     ret=cmd_generic_call(NDRXD_COM_SRVUNADV_RQ, NDRXD_SRC_SERVER,
0219                         NDRXD_CALL_TYPE_PM_INFO,
0220                         (command_call_t *)unadv, send_size,
0221                         ndrx_get_G_atmi_conf()->reply_q_str,
0222                         ndrx_get_G_atmi_conf()->reply_q,
0223                         (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0224                         ndrx_get_G_atmi_conf()->ndrxd_q_str,
0225                         0, NULL,
0226                         NULL,
0227                         NULL,
0228                         NULL,
0229                         EXFALSE);
0230     if (EXSUCCEED!=ret)
0231     {
0232         /*Just ignore the error*/
0233         if (!G_shm_srv || ENOENT==ret)
0234         {
0235             NDRX_LOG(log_error, "Not attached to ndrxd - ignore error");
0236             ret=EXSUCCEED;
0237         }    
0238         else
0239         {
0240           ndrx_TPset_error_fmt(TPESYSTEM, "Failed to send command %d to [%s]", 
0241                         NDRXD_COM_SRVUNADV_RQ, ndrx_get_G_atmi_conf()->ndrxd_q_str);  
0242         }
0243     }
0244 
0245 out:
0246     
0247     if (NULL!=buf)
0248     {
0249         NDRX_SYSBUF_FREE(buf);
0250     }
0251 
0252     return ret;
0253 }
0254 
0255 
0256 /**
0257  * Send advertise block to ndrxd.
0258  * @return
0259  */
0260 expublic int advertse_to_ndrxd(svc_entry_fn_t *entry)
0261 {
0262     int ret=EXSUCCEED;
0263     char *buf = NULL;
0264     size_t buf_len;
0265     command_dynadvertise_t *adv;
0266     size_t  send_size=sizeof(command_dynadvertise_t);
0267 
0268     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0269     
0270     memset(buf, 0, sizeof(command_dynadvertise_t));
0271     adv = (command_dynadvertise_t *)buf;
0272     
0273     /* format out the status report */
0274     adv->srvid= G_server_conf.srv_id;
0275     NDRX_STRCPY_SAFE(adv->svc_nm, entry->svc_nm);
0276     NDRX_STRCPY_SAFE(adv->fn_nm, entry->fn_nm);
0277     /*Transfer the time there*/
0278     adv->qopen_time = entry->qopen_time;
0279     
0280     ret=cmd_generic_call(NDRXD_COM_SRVADV_RQ, NDRXD_SRC_SERVER,
0281                         NDRXD_CALL_TYPE_PM_INFO,
0282                         (command_call_t *)adv, send_size,
0283                         ndrx_get_G_atmi_conf()->reply_q_str,
0284                         ndrx_get_G_atmi_conf()->reply_q,
0285                         (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0286                         ndrx_get_G_atmi_conf()->ndrxd_q_str,
0287                         0, NULL,
0288                         NULL,
0289                         NULL,
0290                         NULL,
0291                         EXFALSE);
0292     if (EXSUCCEED!=ret)
0293     {
0294         /*Just ignore the error*/
0295         if (!G_shm_srv || ENOENT==ret)
0296         {
0297             NDRX_LOG(log_error, "Not attached to ndrxd - ignore error");
0298             ret=EXSUCCEED;
0299         }    
0300         else
0301         {
0302           ndrx_TPset_error_fmt(TPESYSTEM, "Failed to send command %d to [%s]", 
0303                         NDRXD_COM_SRVUNADV_RQ, ndrx_get_G_atmi_conf()->ndrxd_q_str);  
0304         }
0305     }
0306 
0307 out:
0308     
0309     if (NULL!=buf)
0310     {
0311         NDRX_SYSBUF_FREE(buf);
0312     }
0313 
0314     return ret;
0315 }
0316 
0317 /**
0318  * We might get request during bufcall processing...
0319  * Also we need some timer, so that we can giveup finally...!
0320  * @param buf
0321  * @param len
0322  * @return 
0323  */
0324 exprivate int get_bridges_rply_request(char **buf, long len)
0325 {
0326     int ret=EXSUCCEED;
0327     
0328     /* we should re-queue back the stuff... */
0329     sleep(0); /* requeue stuff... */
0330     
0331     ret = process_admin_req(buf, len, &G_shutdown_req);
0332     
0333     if (ndrx_stopwatch_get_delta_sec(&M_getbrs_timer) > ndrx_get_G_atmi_env()->time_out)
0334     {
0335         NDRX_LOG(log_error, "Did not get reply from ndrxd int time for "
0336                 "bridge listing - FAIL!");
0337         ret=EXFAIL;
0338     }
0339     
0340     return ret;
0341 }
0342 
0343 /**
0344  * Get list of bridges connected to the domain.
0345  * err: We might get ping request during call.
0346  * this causes corruption of response.
0347  * !!!NOTE: Might want to store connected nodes in shared mem!!!!!!
0348  * ndrxd could update shared mem for bridges, for full refresh and for delete updates...
0349  * !!! looks like not used any more...!!!
0350  * @return
0351  */
0352 expublic int ndrxd_get_bridges(char *nodes_out)
0353 {
0354     int ret=EXSUCCEED;
0355     command_call_t req;
0356     size_t  send_size=sizeof(command_call_t);
0357     command_reply_getbrs_t rply;
0358     int rply_len= sizeof(rply);
0359     svc_entry_fn_t *entry = G_server_conf.service_array[ATMI_SRV_ADMIN_Q];
0360 
0361     ndrx_stopwatch_reset(&M_getbrs_timer);
0362     
0363     memset(&req, 0, sizeof(req));
0364     memset(&rply, 0, sizeof(rply));
0365     
0366     /* We should enter our reply Q in blocked mode (so that we get response from NDRXD)! */
0367     ndrx_q_setblock(entry->q_descr, EXTRUE);
0368 
0369     /*
0370     NDRX_LOG(log_debug, "ndrxd_get_bridges: call flags=0x%x", req.flags);
0371     */
0372     ret=cmd_generic_bufcall(NDRXD_COM_SRVGETBRS_RQ, NDRXD_SRC_SERVER,
0373                         NDRXD_CALL_TYPE_GENERIC,
0374                         &req, send_size,
0375                         entry->listen_q,
0376                         entry->q_descr,
0377                         (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0378                         ndrx_get_G_atmi_conf()->ndrxd_q_str,
0379                         0, NULL,
0380                         NULL,
0381                         NULL,
0382                         NULL,
0383                         EXTRUE,
0384                         EXFALSE,
0385                         (char *)&rply,
0386                         &rply_len,
0387                         0,
0388                         get_bridges_rply_request);
0389     if (EXSUCCEED!=ret)
0390     {
0391         /*Just ignore the error*/
0392         if (!G_shm_srv)
0393         {
0394             NDRX_LOG(log_error, "Not attached to shm/ndrxd - ingore error");
0395             ret=EXSUCCEED;
0396         }    
0397         else
0398         {
0399           ndrx_TPset_error_fmt(TPESYSTEM, "Failed to send command %d to [%s]", 
0400                         NDRXD_COM_SRVUNADV_RQ, ndrx_get_G_atmi_conf()->ndrxd_q_str);  
0401         }
0402     }
0403     else
0404     {
0405         if (rply_len != sizeof(command_reply_getbrs_t))
0406         {
0407             NDRX_LOG(log_error, "Invalid reply, got len: %d expected: %d",
0408                     rply_len, sizeof(command_reply_getbrs_t));
0409             EXFAIL_OUT(ret);
0410         }
0411         else
0412         {
0413             strcpy(nodes_out, rply.nodes);
0414         }
0415     }
0416 
0417 out:
0418     /* Unblock the Q */
0419     ndrx_q_setblock(entry->q_descr, EXFALSE);
0420 
0421     return ret;
0422 }
0423 
0424 
0425 /**
0426  * Reply with ping response to ndrxd
0427  * @return
0428  */
0429 expublic int pingrsp_to_ndrxd(command_srvping_t *ping)
0430 {
0431     int ret=EXSUCCEED;
0432     
0433     /* if MT, wait for thread-pool one slot... 
0434      * So that ping ensures that server is functional
0435      */
0436     if (G_server_conf.is_threaded)
0437     {
0438         NDRX_LOG(log_debug, "Wait for one free MT thread before ping response");
0439         ndrx_thpool_wait_one(G_server_conf.dispthreads);
0440     }
0441     
0442     ret=cmd_generic_call(NDRXD_COM_SRVPING_RP, NDRXD_SRC_SERVER,
0443                         NDRXD_CALL_TYPE_PM_INFO,
0444                         (command_call_t *)ping, sizeof(*ping),
0445                         ndrx_get_G_atmi_conf()->reply_q_str,
0446                         ndrx_get_G_atmi_conf()->reply_q,
0447                         (mqd_t)EXFAIL,   /* do not keep open ndrxd q open */
0448                         ndrx_get_G_atmi_conf()->ndrxd_q_str,
0449                         0, NULL,
0450                         NULL,
0451                         NULL,
0452                         NULL,
0453                         EXFALSE);
0454     if (EXSUCCEED!=ret)
0455     {
0456         /*Just ignore the error*/
0457         if (!G_shm_srv)
0458         {
0459             NDRX_LOG(log_error, "Not attached to shm/ndrxd - ingore error");
0460             ret=EXSUCCEED;
0461         }    
0462         else
0463         {
0464             NDRX_LOG(log_error, "Failed to reply on ping! seq=%d", 
0465                     ping->seq);
0466             userlog("Failed to reply with ping to ndrxd. srvid=%d seq=%d", 
0467                     ping->srvid, ping->seq);
0468         }
0469     }
0470     
0471 out:
0472     return ret;
0473 }
0474 
0475 /* vim: set ts=4 sw=4 et smartindent: */