Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief This is client image grabber & mapping
0003  *   TODO: XATMI client contexts start from 1. Thus avoid the 0 contexts
0004  *   as we are setting them to -1. Thus the -1 we will allow to update to 1.
0005  *
0006  * @file client.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 #include <unistd.h>
0043 #include <signal.h>
0044 
0045 #include <ndebug.h>
0046 #include <atmi.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <Excompat.h>
0053 #include <ubfutil.h>
0054 #include <sys_unix.h>
0055 #include <utlist.h>
0056 
0057 #include <tpadmsv.h>
0058 #include "expr.h"
0059 #include "cpm.h"
0060 /*---------------------------Externs------------------------------------*/
0061 /*---------------------------Macros-------------------------------------*/
0062 /*---------------------------Enums--------------------------------------*/
0063 /*---------------------------Typedefs-----------------------------------*/
0064 
0065 /**
0066  * Image of the client information
0067  */
0068 typedef struct ndrx_adm_client ndrx_adm_client_t;
0069 struct ndrx_adm_client
0070 {
0071     char clientid[128+1];     /**< myid                                 */
0072     char name[MAXTIDENT+1];   /**< process name                         */
0073     char lmid[MAXTIDENT+1];   /**< cluster node id                      */
0074     /** may be: ACT, SUS (not used), DEA - dead */
0075     char state[15+1];         /**< state of the client live/dead by pid */
0076     long pid;                 /**< process PID                          */
0077     long curconv;             /**< number of conversations process into */
0078     long contextid;           /**< Multi-threading context id           */
0079     long curtime;             /**< Current time when process started    */
0080     int cursor_loaded;        /**< Is loaded into cursor?               */
0081     
0082     EX_hash_handle hh;         /**< makes this structure hashable       */
0083     ndrx_adm_client_t *next;   /**< Add entries to LL, in case of DEA   */
0084 
0085 };
0086 
0087 /**
0088  * Client class infos mapping table
0089  */
0090 expublic ndrx_adm_elmap_t ndrx_G_client_map[] =
0091 {  
0092     /* Driving of the Preparing: */
0093      {TA_LMID,          TPADM_EL(ndrx_adm_client_t, lmid)} /* key */
0094     ,{TA_CLIENTID,       TPADM_EL(ndrx_adm_client_t, clientid)} /* key */
0095     ,{TA_CLTNAME,       TPADM_EL(ndrx_adm_client_t, name)}
0096     ,{TA_STATE,         TPADM_EL(ndrx_adm_client_t, state)}
0097     ,{TA_PID,           TPADM_EL(ndrx_adm_client_t, pid)}
0098     ,{TA_CURCONV,       TPADM_EL(ndrx_adm_client_t, curconv)}
0099     ,{TA_CONTEXTID,     TPADM_EL(ndrx_adm_client_t, contextid)} /* key */
0100     ,{TA_CURTIME,       TPADM_EL(ndrx_adm_client_t, curtime)}
0101     ,{BBADFLDID}
0102 };
0103 
0104 /*---------------------------Globals------------------------------------*/
0105 /*---------------------------Statics------------------------------------*/
0106 
0107 /**
0108  * Pre hash of the results to merge the queue and CPM/SHM infos
0109  */
0110 exprivate ndrx_adm_client_t *M_prehash_act = NULL;
0111 exprivate ndrx_adm_client_t *M_ll_dea = NULL;   /**< List of dead SHM/CPM */
0112 /*---------------------------Prototypes---------------------------------*/
0113 
0114 /**
0115  * Build up the cursor
0116  * Scan the queues and add the elements
0117  * @param clazz class name
0118  * @param cursnew this is new cursor domain model
0119  * @param flags not used
0120  */
0121 expublic int ndrx_adm_client_get(char *clazz, ndrx_adm_cursors_t *cursnew, long flags)
0122 {
0123     int ret = EXSUCCEED;
0124     int typ;
0125     string_list_t* qlist = NULL;
0126     string_list_t* elt = NULL;
0127     ndrx_qdet_t qdet;
0128     TPMYID myid;
0129     ndrx_adm_client_t clt;
0130     ndrx_adm_client_t *p_clt, *p_clt2, *cel, *clet;
0131     int cltshm_attached = EXFALSE;
0132     ndrx_sem_t *sem = NULL;
0133     ndrx_shm_t *shm = NULL;
0134     ndrx_clt_shm_t *el;
0135     long l1, l2;
0136     int idx=0;
0137     int i;
0138     cursnew->map = ndrx_G_client_map;
0139     /* setup the list */
0140     ndrx_growlist_init(&cursnew->list, 100, sizeof(ndrx_adm_client_t));
0141     
0142     qlist = ndrx_sys_mqueue_list_make(G_atmi_env.qpath, &ret);
0143     
0144     if (EXSUCCEED!=ret)
0145     {
0146         NDRX_LOG(log_error, "posix queue listing failed!");
0147         EXFAIL_OUT(ret);
0148     }
0149      
0150     /* seems we need to go over twice, because we might get conversational q first */
0151     LL_FOREACH(qlist,elt)
0152     {
0153         
0154         /* parse the queue..., extract clients.. */
0155         
0156         /* if not print all, then skip this queue */
0157         if (0!=strncmp(elt->qname, 
0158                 G_atmi_env.qprefix_match, G_atmi_env.qprefix_match_len))
0159         {
0160             continue;
0161         }
0162         /* extract clients... get
0163         typ = ndrx_q_type_get(elt->qname);
0164         */
0165         typ = ndrx_q_type_get(elt->qname);
0166         
0167         
0168         if (NDRX_QTYPE_CLTRPLY==typ)
0169         {
0170             if (EXSUCCEED==ndrx_qdet_parse_cltqstr(&qdet, elt->qname))
0171             {
0172                 memset(&clt, 0, sizeof(clt));
0173                 clt.pid = qdet.pid;
0174                 NDRX_STRCPY_SAFE(clt.clientid, elt->qname);
0175                 clt.contextid = qdet.contextid;
0176                 NDRX_STRCPY_SAFE(clt.name, qdet.binary_name);
0177                 snprintf(clt.lmid, sizeof(clt.lmid), "%ld", tpgetnodeid());
0178                 
0179                 if (EXSUCCEED==kill(qdet.pid, 0))
0180                 {
0181                     NDRX_STRCPY_SAFE(clt.state, "ACT");
0182                 }
0183                 else
0184                 {
0185                     NDRX_STRCPY_SAFE(clt.state, "DEA");
0186                 }
0187             
0188                 if (EXSUCCEED!=ndrx_growlist_add(&cursnew->list, (void *)&clt, idx))
0189                 {
0190                     NDRX_LOG(log_error, "Growlist failed - out of memory?");
0191                     EXFAIL_OUT(ret);
0192                 }
0193                 
0194                 NDRX_LOG(log_debug, "client [%s] state %s added (Q)", 
0195                         clt.clientid, clt.state);
0196                 idx++;
0197             }
0198         }        
0199     } /* LL_FOREACH(qlist,elt) */
0200     
0201     /* scan for conv Qs and update the stats... */
0202     LL_FOREACH(qlist,elt)
0203     {
0204         /* parse the queue..., extract clients.. */
0205         
0206         /* if not print all, then skip this queue */
0207         if (0!=strncmp(elt->qname, 
0208                 G_atmi_env.qprefix_match, G_atmi_env.qprefix_match_len))
0209         {
0210             continue;
0211         }
0212         /* extract clients... get
0213         typ = ndrx_q_type_get(elt->qname);
0214         */
0215         typ = ndrx_q_type_get(elt->qname);
0216         
0217         if (NDRX_QTYPE_CONVINIT==typ)
0218         {
0219             /* parse: NDRX_CONV_INITATOR_Q_PFX and search for client if so */
0220             if (EXSUCCEED==ndrx_cvnq_parse_client(elt->qname, &myid))
0221             {
0222                 /* search for client... */
0223                 for (i=0; i<=cursnew->list.maxindexused; i++)
0224                 {
0225                     p_clt = (ndrx_adm_client_t *) (cursnew->list.mem + i*sizeof(ndrx_adm_client_t));
0226                     
0227                     /* reset binary_name to common len... */
0228                     myid.binary_name[MAXTIDENT] = EXEOS;
0229                     if (p_clt->pid == myid.pid
0230                             && p_clt->contextid == myid.contextid
0231                             && 0==strcmp(p_clt->name, myid.binary_name)
0232                             && myid.nodeid == atoi(p_clt->lmid)
0233                             )
0234                     {
0235                         p_clt->curconv++;
0236                         NDRX_LOG(log_debug, "client [%s] currconv incremented to %ld", 
0237                                 p_clt->name, p_clt->curconv);
0238                         break;
0239                     }
0240                 }
0241                 
0242             } /* If q parse OK */
0243         } /* NDRX_CLT_QREPLY_PFX==typ */
0244         
0245     } /* LL_FOREACH(qlist,elt) */
0246         
0247     
0248     /* open the SHM & scan for clients -> think about opening at startup?
0249      * may be we could conserve some resources.
0250      * On the other hand cpmsrv can live it's life and manage it's shm
0251      * as it wishes?
0252      */
0253     
0254     if (EXSUCCEED==ndrx_cltshm_init(EXTRUE))
0255     {
0256         char *p;
0257         cltshm_attached = EXTRUE;
0258         
0259         sem = ndrx_cltshm_sem_get();
0260         shm = ndrx_cltshm_mem_get();
0261 
0262         /* scan for the elements */
0263         if (EXSUCCEED!=ndrx_sem_rwlock(sem, 0, NDRX_SEM_TYP_READ))
0264         {
0265             EXFAIL_OUT(ret);
0266         }
0267         
0268         NDRX_LOG(log_debug, "Build up the hash of SHM");
0269         for (i=0; i<G_atmi_env.max_clts; i++)
0270         {
0271             el = NDRX_CPM_INDEX(shm->mem, i);
0272             
0273             if (el->flags & NDRX_CPM_MAP_WASUSED)
0274             {
0275                 int err;
0276                 p_clt = NDRX_CALLOC(1, sizeof(ndrx_adm_client_t));
0277                 
0278                 err = errno;
0279                 
0280                 if (NULL==p_clt)
0281                 {
0282                     NDRX_LOG(log_error, "Failed to calloc of %d bytes failed: %s",
0283                             sizeof(ndrx_adm_client_t), strerror(err));
0284                     userlog("Failed to calloc of %d bytes failed: %s",
0285                             sizeof(ndrx_adm_client_t), strerror(err));
0286 
0287                     /* Have unlock Support #443 */
0288                     ndrx_sem_rwunlock(sem, 0, NDRX_SEM_TYP_READ);
0289 
0290                     EXFAIL_OUT(ret);
0291                 }
0292                 
0293                 p_clt->pid = el->pid;
0294                 
0295                 snprintf(p_clt->clientid, sizeof(p_clt->clientid),
0296                         "%ld/%s", tpgetnodeid(), el->key);
0297                 
0298                 p = strchr(p_clt->clientid, NDRX_CPM_SEP);
0299                 if (NULL!=p)
0300                 {
0301                     *p = '/';
0302                 }
0303                 
0304                 /* set start time */
0305                 p_clt->curtime = (long)el->stattime;
0306                 p_clt->curconv = EXFAIL;
0307                 p_clt->contextid = EXFAIL;
0308                 snprintf(p_clt->lmid, sizeof(clt.lmid), "%ld", tpgetnodeid());
0309                 
0310                 /* set binary name */
0311                 NDRX_STRCPY_SAFE(p_clt->name, el->procname);
0312 
0313                 if (el->flags & NDRX_CPM_MAP_ISUSED && ndrx_sys_is_process_running_by_pid(el->pid))
0314                 {
0315                     NDRX_STRCPY_SAFE(p_clt->state, "ACT");
0316                     EXHASH_ADD_LONG(M_prehash_act, pid, p_clt);
0317                     NDRX_LOG(log_debug, "Hashed pid=%d - %s", p_clt->pid, p_clt->clientid);
0318                 }
0319                 else
0320                 {
0321                     /* as cannot merge by PID: */
0322                     NDRX_STRCPY_SAFE(p_clt->state, "DEA");
0323                     LL_APPEND(M_ll_dea, p_clt);
0324                 }
0325                 
0326             } /* if used */
0327         }
0328 
0329         /* Bug #443, use correct operation type... */
0330         ndrx_sem_rwunlock(sem, 0, NDRX_SEM_TYP_READ);
0331     }
0332     
0333     /* merge the clients.. */
0334     NDRX_LOG(log_debug, "Merge clients Q + SHM");
0335     
0336     for (i=0; i<=cursnew->list.maxindexused; i++)
0337     {
0338         /* check the hash, update */
0339         p_clt = (ndrx_adm_client_t *) (cursnew->list.mem + i*sizeof(ndrx_adm_client_t));
0340         
0341         EXHASH_FIND_LONG(M_prehash_act, &(p_clt->pid), p_clt2);
0342         
0343         if (NULL!=p_clt2)
0344         {
0345             NDRX_LOG(log_debug, "PID %d matched with SHM", (int)p_clt->pid);
0346             /* update the client */
0347             snprintf(p_clt->clientid, sizeof(p_clt->clientid), "%s/%ld",
0348                     p_clt2->clientid, p_clt->contextid);
0349             
0350             /* update startup time */
0351             p_clt->curtime = p_clt2->curtime;
0352             p_clt2->cursor_loaded = EXTRUE;
0353         }
0354     }
0355     
0356     /* load the hash elems with out other data to cursor */
0357     NDRX_LOG(log_debug, "Add HASH/SHM data");
0358     EXHASH_ITER(hh, M_prehash_act, cel, clet)
0359     {
0360         if (!cel->cursor_loaded)
0361         {
0362             /* Add to cursor as much data as we have */
0363             memcpy(&clt, cel, sizeof(clt));
0364             if (EXSUCCEED!=ndrx_growlist_add(&cursnew->list, (void *)&clt, idx))
0365             {
0366                 NDRX_LOG(log_error, "Growlist failed - out of memory?");
0367                 EXFAIL_OUT(ret);
0368             }
0369 
0370             NDRX_LOG(log_debug, "client [%s] state %s added (SHM)", clt.clientid, clt.state);
0371             idx++;
0372         }
0373     }
0374 
0375     LL_FOREACH(M_ll_dea, cel)
0376     {
0377         memcpy(&clt, cel, sizeof(clt));
0378         if (EXSUCCEED!=ndrx_growlist_add(&cursnew->list, (void *)&clt, idx))
0379         {
0380             NDRX_LOG(log_error, "Growlist failed - out of memory?");
0381             EXFAIL_OUT(ret);
0382         }
0383 
0384         NDRX_LOG(log_debug, "client [%s] state %s added (SHM)", clt.clientid, clt.state);
0385         idx++;
0386     }
0387     
0388 out:
0389     
0390     if (cltshm_attached)
0391     {
0392         ndrx_cltshm_detach();
0393     }
0394     
0395     if (NULL!=qlist)
0396     {
0397         ndrx_string_list_free(qlist);
0398     }
0399 
0400     if (EXSUCCEED!=ret)
0401     {
0402         ndrx_growlist_free(&cursnew->list);
0403     }
0404 
0405     /* clean up the hash list */
0406     EXHASH_ITER(hh, M_prehash_act, cel, clet)
0407     {
0408         EXHASH_DEL(M_prehash_act, cel);
0409         NDRX_FREE(cel);
0410     }
0411 
0412     /* clean up the hash list */
0413     LL_FOREACH_SAFE(M_ll_dea, cel, clet)
0414     {
0415         LL_DELETE(M_ll_dea, cel);
0416         NDRX_FREE(cel);
0417     }
0418 
0419     return ret;
0420 }
0421 
0422 /* vim: set ts=4 sw=4 et smartindent: */