Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief System V XATMI service administrative message dispatching
0003  *   to System V main thread via queue and including a wakeup call.
0004  *   When main process needs to terminate the admin thread, it just sends
0005  *   specially coded command, so that msgrcv() gets it and exists cleanly.
0006  *
0007  * @file sys_svqadmin.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 
0038 /*---------------------------Includes-----------------------------------*/
0039 
0040 #include <stdlib.h>
0041 #include <stdio.h>
0042 #include <fcntl.h>           /* For O_* constants */
0043 #include <sys/ipc.h>
0044 #include <sys/msg.h>
0045 #include <pthread.h>
0046 #include <signal.h>
0047 
0048 #include <ndrstandard.h>
0049 
0050 #include <nstopwatch.h>
0051 #include <nstd_tls.h>
0052 #include <exhash.h>
0053 #include <ndebug.h>
0054 #include <sys_unix.h>
0055 #include <sys_svq.h>
0056 #include <ndrxdiag.h>
0057 #include "atmi_int.h"
0058 
0059 /*---------------------------Externs------------------------------------*/
0060 /*---------------------------Macros-------------------------------------*/
0061 /*---------------------------Enums--------------------------------------*/
0062 /*---------------------------Typedefs-----------------------------------*/
0063 
0064 /**
0065  * Send internal shutdown message.
0066  * If changing, see command_reply_t.
0067  */
0068 typedef struct
0069 {
0070     long mtype; /**< mandatory for System V queues                  */
0071     short command_id; /**< must be the same as for command_reply_t  */
0072 } ndrx_thstop_command_call_t;
0073 
0074 /*---------------------------Globals------------------------------------*/
0075 /*---------------------------Statics------------------------------------*/
0076 
0077 exprivate mqd_t M_adminq = (mqd_t)EXFAIL;
0078 exprivate pthread_t M_evthread;
0079 exprivate int M_first = EXTRUE; /**< Did we have an init?               */
0080 /*---------------------------Prototypes---------------------------------*/
0081 
0082 exprivate void * ndrx_svqadmin_run(void* arg);
0083 
0084 /**
0085  * Prepare for forking. this will stop the admin thread
0086  */
0087 expublic void ndrx_svqadmin_fork_prepare(void)
0088 {
0089     NDRX_LOG(log_debug, "Terminating admin thread before forking...");
0090     if (EXSUCCEED!=ndrx_svqadmin_deinit())
0091     {
0092         NDRX_LOG(log_error, "admin_fork_prepare() failed");
0093     }
0094 }
0095 
0096 /**
0097  * Restart admin thread for 
0098  */
0099 exprivate void ndrx_svqadmin_fork_resume(void)
0100 {
0101     int ret;
0102     pthread_attr_t pthread_custom_attr;
0103     pthread_attr_init(&pthread_custom_attr);
0104     
0105     ndrx_platf_stack_set(&pthread_custom_attr);
0106     
0107     if (EXSUCCEED!=(ret=pthread_create(&M_evthread, &pthread_custom_attr, 
0108             &ndrx_svqadmin_run, NULL)))
0109     {
0110         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "admin thread");
0111     }
0112 }
0113 
0114 /**
0115  * Perform init on Admin queue
0116  * @param adminq admin queue already open
0117  * @return EXSUCCEED/EXFAIL
0118  */
0119 expublic int ndrx_svqadmin_init(mqd_t adminq)
0120 {
0121     int ret = EXSUCCEED;
0122     pthread_attr_t pthread_custom_attr;
0123     pthread_attr_init(&pthread_custom_attr);
0124     
0125     ndrx_platf_stack_set(&pthread_custom_attr);
0126     
0127     M_adminq = adminq;
0128     
0129     if (EXSUCCEED!=(ret=pthread_create(&M_evthread, &pthread_custom_attr, 
0130             &ndrx_svqadmin_run, NULL)))
0131     {
0132         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "admin thread");
0133         EXFAIL_OUT(ret);
0134     }
0135     
0136     /* register fork handlers... */
0137     if (M_first)
0138     {
0139         if (EXSUCCEED!=(ret=ndrx_atfork(ndrx_svqadmin_fork_prepare, 
0140             ndrx_svqadmin_fork_resume, NULL)))
0141         {
0142             NDRX_LOG(log_error, "Failed to register fork handlers: %s", strerror(ret));
0143             userlog("Failed to register fork handlers: %s", strerror(ret));
0144             EXFAIL_OUT(ret);
0145         }
0146     M_first = EXFALSE;
0147     }
0148     
0149 out:
0150     return ret;
0151 }
0152 
0153 /**
0154  * Terminate the admin thread
0155  * @return EXSUCCEED/EXFAIL
0156  */
0157 expublic int ndrx_svqadmin_deinit(void)
0158 {
0159     int ret = EXSUCCEED;
0160     ndrx_thstop_command_call_t thstop;
0161     
0162     if (M_first)
0163     {
0164         NDRX_LOG(log_debug, "Admin thread not initialized");
0165         goto out;
0166     }
0167     
0168     thstop.mtype =1;
0169     thstop.command_id=NDRX_COM_SVQ_PRIV;
0170     
0171     NDRX_LOG(log_debug, "Requesting admin thread shutdown...");
0172     if (EXSUCCEED!=msgsnd(M_adminq->qid, &thstop, 
0173             NDRX_SVQ_INLEN(sizeof(ndrx_thstop_command_call_t)), 0))
0174     {
0175         int err = errno;
0176         NDRX_LOG(log_error, "Failed to send term msg: %s", strerror(err));
0177         userlog("Failed to send term msg: %s", strerror(err));
0178         EXFAIL_OUT(ret);
0179     }
0180     
0181     /* join the admin thread / wait for terminate */
0182     pthread_join(M_evthread, NULL);
0183     
0184     NDRX_LOG(log_debug, "Admin thread terminated...");
0185     
0186 out:
0187     return ret;
0188 }
0189 
0190 /**
0191  * Run the admin queue. How about messages, we shall allocate them
0192  * as they will be free'd by the main thread, not?
0193  * @param arg
0194  * @return 
0195  */
0196 exprivate void * ndrx_svqadmin_run(void* arg)
0197 {
0198     int ret = EXSUCCEED;
0199     int qid;
0200     int sz, len;
0201     int err;
0202     ndrx_thstop_command_call_t *p_cmd;
0203     char *buf = NULL;
0204     sigset_t set;
0205     
0206     /* init the TLS thread */
0207     _Nunset_error();
0208     
0209     /* mask all signals, as this thread shall not break any ongoing processes
0210      * with signal scheduling
0211      */
0212     if (EXSUCCEED!=sigfillset(&set))
0213     {
0214         err = errno;
0215         NDRX_LOG(log_error, "Failed to fill signal array: %s", strerror(err));
0216         userlog("Failed to fill signal array: %s", strerror(err));
0217         EXFAIL_OUT(ret);
0218     }
0219     
0220     if (EXSUCCEED!=pthread_sigmask(SIG_BLOCK, &set, NULL))
0221     {
0222         err = errno;
0223         NDRX_LOG(log_error, "Failed to block all signals but %d for even thread: %s", 
0224                 NDRX_SVQ_SIG, strerror(err));
0225         userlog("Failed to block all signals but %d for even thread: %s", 
0226                 NDRX_SVQ_SIG, strerror(err));
0227         EXFAIL_OUT(ret);
0228     }
0229     
0230     /* Wait for message to arrive
0231      * and post to main thread if have any..
0232      */
0233     while (1)
0234     {
0235         qid = M_adminq->qid;
0236      
0237         /* Allocate the message size */
0238         sz = NDRX_MSGSIZEMAX;
0239         
0240         /*
0241         if (NULL==(M_buf = NDRX_MALLOC(sz)))
0242         {
0243             int err = errno;
0244             NDRX_LOG(log_error, "Failed to malloc %d bytes: %s", sz, strerror(err));
0245             userlog("Failed to malloc %d bytes: %s", sz, strerror(err));
0246             EXFAIL_OUT(ret);
0247         }
0248          */
0249         
0250         if (NULL==buf)
0251         {
0252             size_t tmp_buf_len;
0253             NDRX_SYSBUF_MALLOC_OUT(buf, tmp_buf_len, ret);
0254         }
0255         
0256         NDRX_LOG(log_debug, "About to wait for service admin message qid=%d", qid);
0257        
0258         /* read the message, well we could read it directly from MQD 
0259          * then we do not need any locks..
0260          */
0261         len = msgrcv(qid, buf, NDRX_SVQ_INLEN(sz), 0, 0);
0262         err = errno;
0263         
0264         NDRX_LOG(log_debug, "Admin msgrcv: qid=%d len=%d", qid, len);
0265         
0266         if (EXFAIL==len)
0267         {
0268             /* check the error code, if queue removed, then no problem just
0269              * terminate and that's it
0270              */
0271             if (EIDRM==err)
0272             {
0273                 NDRX_LOG(log_debug, "Admin queue removed, terminating thread...");
0274                 goto out;
0275             }
0276             else
0277             {
0278                 NDRX_LOG(log_debug, "Failed to receive message on admin q: %s",
0279                         strerror(err));
0280                 userlog("Failed to receive message on admin q: %s",
0281                         strerror(err));
0282                 EXFAIL_OUT(ret);
0283             }
0284         }
0285         else
0286         {
0287             
0288             p_cmd = (ndrx_thstop_command_call_t *)buf;
0289             
0290             if (NDRX_SVQ_OUTLEN(len) == sizeof(ndrx_thstop_command_call_t)
0291                     && NDRX_COM_SVQ_PRIV==p_cmd->command_id)
0292             {
0293                 NDRX_LOG(log_info, "Admin thread shutdown requested...");
0294                 break;
0295             }
0296             else
0297             {
0298                 /* push admin queue event... */
0299                 ndrx_svq_ev_t *ev = NDRX_FPMALLOC(sizeof(ndrx_svq_ev_t), 0);
0300 
0301                 if (NULL==ev)
0302                 {
0303                     err = errno;
0304                     NDRX_LOG(log_error, "Failed to malloc event %d bytes: %s",
0305                             sizeof(ndrx_svq_ev_t), strerror(err));
0306                     userlog("Failed to malloc event %d bytes: %s",
0307                             sizeof(ndrx_svq_ev_t), strerror(err));
0308                     EXFAIL_OUT(ret);
0309                 }
0310 
0311                 ev->data = buf;
0312                 ev->datalen = NDRX_SVQ_OUTLEN(len);
0313                 ev->ev = NDRX_SVQ_EV_DATA;
0314                 ev->next = NULL;
0315                 ev->prev = NULL;
0316                 NDRX_LOG(log_debug, "Putting admin event...");
0317                 ret = ndrx_svq_mqd_put_event(ndrx_svq_mainq_get(), &ev);
0318                 
0319                 /* shall be cleared if sent OK */
0320                 if (NULL!=ev)
0321                 {
0322                     NDRX_SYSBUF_FREE(buf);
0323                     NDRX_FPFREE(ev);
0324                 }
0325                 
0326                 /* Release pointer, as it was delivered to poller..
0327                  * so that we do not memory leaks at shutdown...
0328                  */
0329                 buf = NULL;
0330                 
0331                 if (EXSUCCEED!=ret)
0332                 {
0333                     NDRX_LOG(log_error, "Failed to put admin event");
0334                     userlog("Failed to put admin event");
0335                     EXFAIL_OUT(ret);
0336                 }
0337 
0338                 NDRX_LOG(log_debug, "After admin event...");
0339             }
0340             
0341         }
0342     }
0343 out:
0344     if (EXSUCCEED!=ret)
0345     {
0346         NDRX_LOG(log_error, "Admin thread failed! Abort as cannot "
0347                 "guarantee application stability!");
0348         userlog("Admin thread failed! Abort as cannot "
0349                 "guarantee application stability!");
0350         /*abort();*/
0351         exit(EXFAIL);
0352     }
0353 
0354     if (NULL!=buf)
0355     {
0356         NDRX_SYSBUF_FREE(buf);
0357     }
0358 
0359     return NULL;
0360 }
0361 
0362 /* vim: set ts=4 sw=4 et smartindent: */