Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief System V specifics for
0003  *
0004  * @file sanity_sysv.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <errno.h>
0038 #include <memory.h>
0039 #include <sys/types.h>
0040 #include <dirent.h>
0041 #include <sys/stat.h>
0042 #include <utlist.h>
0043 #include <fcntl.h>
0044 
0045 #include <ndrstandard.h>
0046 #include <ndrxd.h>
0047 #include <atmi_int.h>
0048 #include <nstopwatch.h>
0049 
0050 #include <ndebug.h>
0051 #include <cmd_processor.h>
0052 #include <signal.h>
0053 #include <bridge_int.h>
0054 #include <atmi_shm.h>
0055 #include <userlog.h>
0056 #include <sys_unix.h>
0057 #include <sys_svq.h>
0058 #include <fcntl.h>
0059 
0060 /*---------------------------Externs------------------------------------*/
0061 /*---------------------------Macros-------------------------------------*/
0062 /*---------------------------Enums--------------------------------------*/
0063 /*---------------------------Typedefs-----------------------------------*/
0064 /*---------------------------Globals------------------------------------*/
0065 /*---------------------------Statics------------------------------------*/
0066 /*---------------------------Prototypes---------------------------------*/
0067 
0068 /**
0069  * Remove any pending calls to request address.
0070  * Basically we construct here mqd_t and let common function to flush the calls.
0071  * The common function doesn't do any lockings, thus this approach is fine.
0072  * @param qid queue id
0073  * @param qstr queue string
0074  * @return EXSUCCEED/EXFAIL
0075  */
0076 exprivate int flush_rqaddr(int qid, char *qstr)
0077 {
0078     int ret = EXSUCCEED;
0079     mqd_t mqd = NULL;
0080     int err;
0081     
0082     mqd = NDRX_CALLOC(1, sizeof(struct ndrx_svq_info));
0083     
0084     if (NULL==mqd)
0085     {
0086         err = errno;
0087         
0088         NDRX_LOG(log_error, "Failed to malloc %d bytes", 
0089                 (int)sizeof(struct ndrx_svq_info));
0090         userlog("Failed to malloc %d bytes", 
0091                 (int)sizeof(struct ndrx_svq_info));
0092         EXFAIL_OUT(ret);
0093     }
0094     
0095     mqd->qid = qid;
0096     NDRX_STRCPY_SAFE(mqd->qstr, qstr);
0097     
0098     /* set attr as non blocked */
0099     mqd->attr.mq_flags |= O_NONBLOCK;
0100     
0101     /* lets flush the queue now. */
0102     if (EXSUCCEED!=remove_service_q(NULL, EXFAIL, mqd, qstr))
0103     {
0104         NDRX_LOG(log_error, "Failed to flush [%s]/%d", qstr, qid);
0105         EXFAIL_OUT(ret);
0106     }
0107 out:
0108     
0109     if (NULL!=mqd)
0110     {
0111         NDRX_FREE(mqd);
0112     }
0113     return ret;
0114 }
0115 
0116 /**
0117  * System V sanity checks.
0118  * Includes following steps:
0119  * - get a copy of SV5 queue maps
0120  * - scan the shared memory of services, and mark the used sv5 qids.
0121  * - then scan the list of used qids for the request addresses
0122  * - remove any qid, that is not present in shm (the removal shall be done
0123  *  in sv5 library with the write lock present and checking the ctime again
0124  *  so that we have a real sync). Check the service rqaddr by NDRX_SVQ_MAP_RQADDR
0125  * 
0126  * Well if we are about to remove stale request addresses, we could report them
0127  * from the server processes. And thus locate if none of available server processes
0128  * belongs to request address, then queue is unlinked. This will protect us from
0129  * unlinking queues to which working zero service servers are located, like
0130  * tpbridge...
0131  * @param finalchk do not use TTL for non service linked request address removal
0132  * 
0133  * @return SUCCEED/FAIL
0134  */
0135 expublic int do_sanity_check_sysv(int finalchk)
0136 {
0137     int ret=EXSUCCEED;
0138     ndrx_svq_status_t *svq = NULL;
0139     int len;
0140     int reslen;
0141     int i, j;
0142     int have_value_3;
0143     int pos_3;
0144     bridgedef_svcs_t *cur, *tmp;
0145     ndrx_shm_resid_t *srvlist = NULL;
0146     pm_node_t *p_pm;
0147     int last;
0148     string_hash_t *strh=NULL, *strh_el = NULL;
0149     
0150     NDRX_LOG(log_debug, "Into System V sanity checks, finalchk: %d", finalchk);
0151     
0152     /* Get the list of queues 
0153      * if no ttl, then give a -1 which will make all queues scheduled for
0154      * removal
0155      */
0156     svq = ndrx_svqshm_statusget(&reslen, (finalchk?-1:G_app_config->rqaddrttl));
0157     
0158     if (NULL==svq)
0159     {
0160         NDRX_LOG(log_error, "Failed to get System V shared memory status!");
0161         userlog("Failed to get System V shared memory status!");
0162         EXFAIL_OUT(ret);
0163     }
0164     
0165     DL_FOREACH(G_process_model, p_pm)
0166     {
0167         /* Find live count of p_pm by resid... 
0168          * maybe do this once?
0169          * if we are stopping... assume the RQ addr is still ok to avoid false
0170          * warnings.
0171          */
0172         if (NDRXD_PM_RUNNING_OK==p_pm->state || NDRXD_PM_STOPPING == p_pm->state)
0173         {
0174             if (NULL==ndrx_string_hash_add_cnt(&strh, p_pm->rqaddress))
0175             {
0176                 NDRX_LOG(log_error, "Failed to add rqaddr [%s] to hashmap",
0177                         p_pm->rqaddress);
0178                 EXFAIL_OUT(ret);
0179             }
0180         }
0181     }
0182     
0183     /* Now scan the used services shared memory and updated the 
0184      * status copy accordingly
0185      * WELL! We must loop over the local NDRXD list of the services
0186      * and then update the status. Then we can avoid the locking of
0187      * the shared memory.
0188      */
0189     
0190     /* We assume shm is OK! */
0191     
0192     NDRX_LOG(log_debug, "Marking resources against services, reslen: %d", 
0193             reslen);
0194     
0195     EXHASH_ITER(hh, G_bridge_svc_hash, cur, tmp)
0196     {
0197         if (EXSUCCEED==ndrx_shm_get_srvs(cur->svc_nm, &srvlist, &len))
0198         {
0199             NDRX_LOG(log_debug, "Checking service [%s]", cur->svc_nm);
0200             
0201             /* Check the cluster nodes too, so if it is in network
0202              * then no need to unlink...
0203              */
0204             for (i=0; i<len; i++)
0205             {
0206                 ndrx_svqshm_get_status(svq, srvlist[i].resid, &pos_3, &have_value_3);
0207                 
0208                 if (have_value_3)
0209                 {
0210                     int used_cnt = 0;
0211                     
0212                     NDRX_LOG(log_debug, "Service [%s] have resource %d at idx %d", 
0213                             cur->svc_nm, srvlist[i].resid, i);
0214                     svq[pos_3].flags |= NDRX_SVQ_MAP_HAVESVC;
0215                     
0216                     
0217                     if (svq[pos_3].flags & NDRX_SVQ_MAP_SCHEDRM)
0218                     {
0219                         /* validate the count against the live servers processes,
0220                          * if we found the real count of RQADDR is too short,
0221                          * we must uninstall that number of services.
0222                          * Note that Q has TTL already expired, thus all srvinfos
0223                          * shall be already seen by ndrxd.
0224                          * So can the PM...
0225                          */
0226 
0227                         strh_el = ndrx_string_hash_get(strh, svq[pos_3].qstr);
0228 
0229                         if (NULL!=strh_el)
0230                         {
0231                             used_cnt = strh_el->cnt;
0232                         }
0233 
0234                         if (srvlist[i].cnt!=used_cnt)
0235                         {
0236                             NDRX_LOG(log_error, "Service [%s] rqaddr [%s] resource [%d] cnt=%d actual=%d",
0237                                     cur->svc_nm, svq[pos_3].qstr, srvlist[i].resid, srvlist[i].cnt, used_cnt);
0238                             userlog("Service [%s] rqaddr [%s] resource [%d] cnt=%d actual=%d",
0239                                     cur->svc_nm, svq[pos_3].qstr, srvlist[i].resid, srvlist[i].cnt, used_cnt);
0240                         }
0241 
0242                         for (j=0; j<(srvlist[i].cnt-used_cnt); j++)
0243                         {
0244                             ndrxd_shm_uninstall_svc(cur->svc_nm, &last, srvlist[i].resid);
0245                         }
0246                     }
0247                    
0248                 }
0249                 else
0250                 {
0251                     NDRX_LOG(log_error, "!!! Service [%s] have NO resource %d at idx %d", 
0252                             cur->svc_nm, srvlist[i].resid, i);
0253                     
0254                     /* Shouldn't we housekeep the service as no Qs are available
0255                      * for serving...
0256                      * Thing #2: Seems like linux does not randomize msgid identifiers
0257                      * thus it is possible here, that if server for whom we are removing
0258                      * the identifier is booting in background and at this moment is installing
0259                      * the service queues + services, then there is race condition on this
0260                      * that here we actually remove the service.
0261                      * Probably we could lock the queues and check status again?
0262                      * and if that finds out that Q is missing, then with locked queues
0263                      * we continue to remove the service. We can use read locks here on SystemV
0264                      * but still needs to think about the locking, as then all the
0265                      * else were of the SystemV queues any removal and decision
0266                      * must be made in the locks otherwise we can remove something
0267                      * created in racy manner.
0268                      * Also... it look like on Linux msgid are reused very quickly.
0269                      */
0270                     for (j=0; j<srvlist[i].cnt; j++)
0271                     {
0272                         ndrxd_shm_uninstall_svc(cur->svc_nm, &last, srvlist[i].resid);
0273                     }
0274                 }
0275                 
0276             }         
0277         } /* local servs */
0278         
0279         if (NULL!=srvlist)
0280         {
0281             NDRX_FREE(srvlist);
0282             srvlist = NULL;
0283         }
0284     }
0285     
0286     /* Scan for queues which are not any more is service list, 
0287      * the queue was service, and time have expired for TTL, thus such queues
0288      * are subject for unlinking...
0289      * perform that in sync way...
0290      */
0291     NDRX_LOG(log_debug, "Flush RQADDR queues without services and TTL expired.");
0292     for (i=0; i<reslen; i++)
0293     {
0294         int cont = EXFALSE;
0295         
0296         /*
0297         NDRX_LOG(log_debug, "DEBUG! %d = ISUSED= %d WASUSED=%d  EXPIRED=%d SCHEDRM=%d RQADDR=%d HAVESVC=%d [%s]/%d", 
0298                 i, 
0299                 svq[i].flags & NDRX_SVQ_MAP_ISUSED, 
0300                 svq[i].flags & NDRX_SVQ_MAP_WASUSED, 
0301                 svq[i].flags & NDRX_SVQ_MAP_EXPIRED, 
0302                 svq[i].flags & NDRX_SVQ_MAP_SCHEDRM, 
0303                 svq[i].flags & NDRX_SVQ_MAP_RQADDR, 
0304                 svq[i].flags & NDRX_SVQ_MAP_HAVESVC, 
0305                 svq[i].qstr, svq[i].qid);
0306         */
0307         
0308         if ((svq[i].flags & NDRX_SVQ_MAP_RQADDR)
0309                 && !(svq[i].flags & NDRX_SVQ_MAP_HAVESVC)
0310                 && (svq[i].flags & NDRX_SVQ_MAP_SCHEDRM))
0311         {
0312             
0313             /* Check process model, to see if any active server have this
0314              * request address
0315              */
0316             DL_FOREACH(G_process_model, p_pm)
0317             {
0318                 if (PM_RUNNING(p_pm->state)
0319                         && 0==strcmp(p_pm->rqaddress, svq[i].qstr))
0320                 {
0321                     NDRX_LOG(log_debug, "Server [%s]/%d is using rqddr [%s] - chk next",
0322                         p_pm->binary_name, p_pm->srvid, svq[i].qstr);
0323                     cont = EXTRUE;
0324                     break;
0325                 }
0326             }
0327             
0328             if (cont)
0329             {
0330                 continue;
0331             }
0332             
0333             NDRX_LOG(log_info, "qid %d is subject for delete ttl %d qstr=[%s]", 
0334                     svq[i].qid, G_app_config->rqaddrttl, svq[i].qstr);
0335             
0336             /* well time checking & flushing we shall do here
0337              * due to locking issues... not the way as bellow described...
0338              * There shall be no new message in RQADDR due to stale servers
0339              */
0340             if (EXSUCCEED!=flush_rqaddr(svq[i].qid, svq[i].qstr))
0341             {
0342                 NDRX_LOG(log_error, "Failed to flush RQADDR [%s]/%d", 
0343                         svq[i].qstr, svq[i].qid);
0344                 userlog("Failed to flush RQADDR [%s]/%d", 
0345                         svq[i].qstr, svq[i].qid);
0346             }
0347             
0348             /* Well at this point we shall
0349              * remove call expublic int remove_service_q(char *svc, short srvid, 
0350              * mqd_t in_qd, char *in_qstr)!!!
0351              * because we need to flush the queue of messages...
0352              * only here we have a issue with lockings
0353              * we will be in write mode to MAPs, but in mean time we want to
0354              * perform read ops to the SHM...
0355              * Thus probably we need some globals in svqshm that indicate
0356              * that we have already exclusive lock!
0357              * 
0358              * But as remove_service_q is not using much of the systemv maps
0359              * processing, then we could just simple callback from ndrx_svqshm_ctl
0360              * with qid and queue string. then callback would build simple
0361              * mqd_t and pass it to remove_service_q for message zapping.
0362              */
0363             if (EXSUCCEED!=ndrx_svqshm_ctl(NULL, svq[i].qid, 
0364                     IPC_RMID, EXFAIL, NULL))
0365             {
0366                 NDRX_LOG(log_error, "Failed to unlink qid %d", svq[i].qid);
0367                 EXFAIL_OUT(ret);
0368             }
0369         }
0370     }
0371     
0372 out:
0373     
0374     if (NULL!=svq)
0375     {
0376         NDRX_FREE(svq);
0377     }
0378 
0379     if (NULL!=srvlist)
0380     {
0381         NDRX_FREE(srvlist);
0382     }
0383 
0384     ndrx_string_hash_free(strh);
0385 
0386     return ret;
0387 }
0388 
0389 /**
0390  * Perform final checks on exit - remove all service queues...
0391  * @return 
0392  */
0393 expublic int ndrxd_sysv_finally(void)
0394 {
0395     int ret = EXSUCCEED;
0396     
0397     ret = do_sanity_check_sysv(EXTRUE);
0398     
0399     return ret;
0400 }
0401 
0402 /* vim: set ts=4 sw=4 et smartindent: */