Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Command's `pq' - Print Queue backend
0003  *   This includes backend processing (gathering statistics by sanity callback)
0004  *   This is server side statistics grabber. command "pq"
0005  *   We should implemnet also in xadmin local command "pql" which would print
0006  *   statistics for all prefixed queues (so that we have full view of all
0007  *   queues in system)
0008  *   The ndrxd monitor for us is needed, because of we might want to start
0009  *   services in future automatically of water high or water low...
0010  *
0011  * @file cmd_pq.c
0012  */
0013 /* -----------------------------------------------------------------------------
0014  * Enduro/X Middleware Platform for Distributed Transaction Processing
0015  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0016  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0017  * This software is released under one of the following licenses:
0018  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0019  * See LICENSE file for full text.
0020  * -----------------------------------------------------------------------------
0021  * AGPL license:
0022  *
0023  * This program is free software; you can redistribute it and/or modify it under
0024  * the terms of the GNU Affero General Public License, version 3 as published
0025  * by the Free Software Foundation;
0026  *
0027  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0028  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0029  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0030  * for more details.
0031  *
0032  * You should have received a copy of the GNU Affero General Public License along 
0033  * with this program; if not, write to the Free Software Foundation, Inc.,
0034  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0035  *
0036  * -----------------------------------------------------------------------------
0037  * A commercial use license is available from Mavimax, Ltd
0038  * contact@mavimax.com
0039  * -----------------------------------------------------------------------------
0040  */
0041 #include <string.h>
0042 #include <stdio.h>
0043 #include <stdlib.h>
0044 #include <memory.h>
0045 #include <exhash.h>
0046 #include <errno.h>
0047 #include <ndrstandard.h>
0048 
0049 #include <ndebug.h>
0050 #include <userlog.h>
0051 #include <ndrxd.h>
0052 #include <ndrxdcmn.h>
0053 #include <atmi_shm.h>
0054 
0055 #include "cmd_processor.h"
0056 #include <bridge_int.h>
0057 #ifdef EX_USE_SYSVQ
0058 #include <sys/msg.h>
0059 #endif
0060 /*---------------------------Externs------------------------------------*/
0061 /*---------------------------Macros-------------------------------------*/
0062 /*---------------------------Enums--------------------------------------*/
0063 /*---------------------------Typedefs-----------------------------------*/
0064 /*---------------------------Globals------------------------------------*/
0065 /*---------------------------Statics------------------------------------*/
0066 /*---------------------------Prototypes---------------------------------*/
0067 
0068 
0069 /**
0070  * Modify reply according the data.
0071  * @param call
0072  * @param pm
0073  */
0074 expublic void pq_reply_mod(command_reply_t *reply, size_t *send_size, mod_param_t *params)
0075 {
0076     command_reply_pq_t * pq_info = (command_reply_pq_t *)reply;
0077     bridgedef_svcs_t *svc = (bridgedef_svcs_t *)params->mod_param1;
0078     
0079     reply->msg_type = NDRXD_CALL_TYPE_PQ;
0080     /* calculate new send size */
0081     *send_size += (sizeof(command_reply_pq_t) - sizeof(command_reply_t));
0082 
0083     /* Copy data to reply structure */
0084     NDRX_STRCPY_SAFE(pq_info->service, svc->svc_nm);
0085     memcpy(pq_info->pq_info, svc->pq_info, sizeof(svc->pq_info));
0086     
0087     NDRX_LOG(log_debug, "magic: %ld", pq_info->rply.magic);
0088 }
0089 
0090 /**
0091  * Callback to report startup progress
0092  * @param call
0093  * @param pm
0094  * @return
0095  */
0096 exprivate void pq_progress(command_call_t * call, bridgedef_svcs_t *q_info)
0097 {
0098     int ret=EXSUCCEED;
0099     mod_param_t params;
0100     
0101     NDRX_LOG(log_debug, "pq_progress enter");
0102     memset(&params, 0, sizeof(mod_param_t));
0103 
0104     /* pass to reply process model node */
0105     params.mod_param1 = (void *)q_info;
0106 
0107     if (EXSUCCEED!=simple_command_reply(call, ret, NDRXD_CALL_FLAGS_RSPHAVE_MORE,
0108                             /* hook up the reply */
0109                             &params, pq_reply_mod, 0L, 0, NULL))
0110     {
0111         userlog("Failed to send progress back to [%s]", call->reply_queue);
0112     }
0113     
0114 
0115     NDRX_LOG(log_debug, "pq_progress exit");
0116 }
0117 
0118 /**
0119  * Call to psc command
0120  * @param args
0121  * @return
0122  */
0123 expublic int cmd_pq (command_call_t * call, char *data, size_t len, int context)
0124 {
0125     int ret=EXSUCCEED;
0126     bridgedef_svcs_t *cur, *tmp;
0127     
0128     pq_run_santiy(EXFALSE);
0129 
0130     EXHASH_ITER(hh, G_bridge_svc_hash, cur, tmp)
0131     {
0132         pq_progress(call, cur);
0133     }
0134 
0135     if (EXSUCCEED!=simple_command_reply(call, ret, 0L, NULL, NULL, 0L, 0, NULL))
0136     {
0137         userlog("Failed to send reply back to [%s]", call->reply_queue);
0138     }
0139     NDRX_LOG(log_warn, "cmd_pq returns with status %d", ret);
0140     
0141 out:
0142     return ret;
0143 }
0144 
0145 /**
0146  * Append queue statistics.
0147  * @param run_hist - shift the history
0148  * @return SUCCEED/FAIL
0149  */
0150 expublic int pq_run_santiy(int run_hist)
0151 {
0152     int ret = EXSUCCEED;
0153     bridgedef_svcs_t *cur, *tmp;
0154     int i;
0155     char q[NDRX_MAX_Q_SIZE+1];
0156     struct mq_attr att;
0157     int avg;
0158     int curmsgs;
0159     ndrx_shm_resid_t *srvlist = NULL;
0160     int len;
0161     /* the services are here: G_bridge_svc_hash - we must loop around 
0162      * and get every local queue stats, if queue fails to open, then assume 0
0163      * before that we must shift the array...
0164      */
0165     EXHASH_ITER(hh, G_bridge_svc_hash, cur, tmp)
0166     {
0167         /* shift the stats (if needed) */
0168         if (run_hist)
0169         {
0170             for (i=PQ_LEN-1; i>1; i--) /* fix for rpi... start with PQ_LEN-1!*/
0171             {   
0172                 cur->pq_info[i]=cur->pq_info[i-1];
0173             }
0174         }
0175         
0176 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0177         /* For poll mode, we need a list of servers, so that we can 
0178          * request stats for all servers:
0179          */
0180         curmsgs = 0;
0181         
0182         if (EXSUCCEED==ndrx_shm_get_srvs(cur->svc_nm, &srvlist, &len))
0183         {
0184             for (i=0; i<len; i++)
0185             {
0186                 /* TODO: For System V we could do a direct queue lookup by qid..! */
0187 #if defined(EX_USE_POLL)
0188                 snprintf(q, sizeof(q), NDRX_SVC_QFMT_SRVID, G_sys_config.qprefix, 
0189                         cur->svc_nm, srvlist[i].resid);
0190                 
0191                 if (EXSUCCEED==ndrx_get_q_attr(q, &att))
0192                 {
0193                     curmsgs+= att.mq_curmsgs;
0194                 }
0195 #else
0196                 /* System V approach for queues... quick & easy */
0197                 struct msqid_ds buf;
0198                 
0199                 if (EXSUCCEED==msgctl(srvlist[i].resid, IPC_STAT, &buf))
0200                 {
0201                     curmsgs+= buf.msg_qnum;
0202                 }
0203                 else
0204                 {
0205                     NDRX_LOG(log_warn, "Failed to get qid %d stats: %s",
0206                             srvlist[i].resid, strerror(errno));
0207                 }
0208 #endif
0209             }
0210             
0211             NDRX_FREE(srvlist);
0212         }
0213 #else
0214         /* now write at POS 0, latest reading of service */
0215         snprintf(q, sizeof(q), NDRX_SVC_QFMT, G_sys_config.qprefix, cur->svc_nm);
0216         
0217         if (EXSUCCEED!=ndrx_get_q_attr(q, &att))
0218         {
0219             curmsgs = 0; /* assume 0 */
0220         }
0221         else
0222         {
0223             curmsgs = att.mq_curmsgs;
0224         }
0225 #endif
0226         
0227         cur->pq_info[1] = curmsgs;
0228         
0229         /* Now calculate the average */
0230         avg = 0;
0231         for (i=1; i<PQ_LEN; i++) 
0232         {   
0233             avg+=cur->pq_info[i];
0234         }
0235         cur->pq_info[0] = avg/(PQ_LEN-1);
0236         
0237     }
0238     
0239 out:
0240     return ret;
0241 }
0242 
0243 /* vim: set ts=4 sw=4 et smartindent: */