Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Remove service queue.
0003  *   We should open the service queue, receive all messages and reply back with failure.
0004  *   Thus function shall be called after the service shared memory block is uninstalled!
0005  *
0006  * @file svc_q_remove.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 #include <string.h>
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <errno.h>
0040 #include <memory.h>
0041 #include <sys/types.h>
0042 #include <dirent.h>
0043 #include <sys/stat.h>
0044 #include <utlist.h>
0045 #include <fcntl.h>
0046 
0047 #include <ndrstandard.h>
0048 #include <ndrxd.h>
0049 #include <atmi_int.h>
0050 #include <nstopwatch.h>
0051 
0052 #include <ndebug.h>
0053 #include <cmd_processor.h>
0054 #include <signal.h>
0055 #include <gencall.h>
0056 
0057 #include "userlog.h"
0058 
0059 /*---------------------------Externs------------------------------------*/
0060 /*---------------------------Macros-------------------------------------*/
0061 #define REMOVE_SVC_READVERTISE_LISTS            10*60 /* Keep that stuff for 10 minutes! 
0062                                                          Might consider to move to config param! 
0063                                                         */
0064 /*---------------------------Enums--------------------------------------*/
0065 /*---------------------------Typedefs-----------------------------------*/
0066 typedef struct removed_svcs removed_svcs_t;
0067 struct removed_svcs
0068 {
0069     char svc[XATMI_SERVICE_NAME_LENGTH+1];
0070     ndrx_stopwatch_t remove_time;
0071     removed_svcs_t *prev, *next;
0072 };
0073 
0074 /*---------------------------Globals------------------------------------*/
0075 removed_svcs_t * M_removed = NULL;
0076 /*---------------------------Statics------------------------------------*/
0077 /*---------------------------Prototypes---------------------------------*/
0078 
0079 
0080 /**
0081  * Find entry for service
0082  * @param svc
0083  * @return 
0084  */
0085 exprivate removed_svcs_t * find_removed_entry(char *svc)
0086 {
0087     removed_svcs_t * ret = NULL;
0088     removed_svcs_t * tmp = NULL;
0089     
0090     DL_FOREACH(M_removed, tmp)
0091     {
0092         if (0==strcmp(tmp->svc, svc))
0093         {
0094             ret=tmp;
0095             break;
0096         }
0097     }
0098     
0099     return ret;
0100 }
0101 
0102 /**
0103  * Firstly we should open the Q.
0104  * Then reply with bad response to all msgs, then unlink it.
0105  * So firstly we try to get semaphore...
0106  * After we got it, we check do really Q needs to be removed!
0107  * @param[in] svc service name to unlink, conditional
0108  * @param[in] srvid Server id providing the the service, conditional
0109  * @param[in] in_qd Already open queue descriptor, used by System V zapping
0110  * @param[in] in_qstr for debug purposes queue strnig, either svc+srv 
0111  *  or in_qd +in_qstr
0112  * @return EXSUCCEED/EXFAIL
0113  */
0114 expublic int remove_service_q(char *svc, short srvid, mqd_t in_qd, char *in_qstr)
0115 {
0116     int ret=EXSUCCEED;
0117     char q_str[NDRX_MAX_Q_SIZE+1];
0118     mqd_t qd=(mqd_t)EXFAIL;
0119     char *msg_buf=NULL;
0120     size_t msg_buf_len;
0121     int len;
0122     unsigned prio;
0123     tp_command_generic_t *gen_command;
0124     char *fn = "remove_service_q";
0125     
0126     NDRX_LOG(log_debug, "Enter, svc = [%s], srvid = %hd", svc?svc:"(null)", srvid);
0127     
0128     NDRX_SYSBUF_MALLOC_OUT(msg_buf, msg_buf_len, ret);
0129     
0130     if (NULL!=in_qstr)
0131     {
0132         NDRX_STRCPY_SAFE(q_str, in_qstr);
0133     }
0134     else
0135     {
0136 #ifdef EX_USE_POLL
0137         snprintf(q_str, sizeof(q_str), NDRX_SVC_QFMT_SRVID, 
0138                 G_sys_config.qprefix, svc, srvid);
0139 #else
0140         snprintf(q_str, sizeof(q_str), NDRX_SVC_QFMT, G_sys_config.qprefix, svc);
0141 #endif
0142     }
0143     
0144     NDRX_LOG(log_debug, "Flushing + unlink the queue [%s]", q_str);
0145     
0146     /* Run in non-blocked mode */
0147     if ((mqd_t)EXFAIL!=in_qd)
0148     {
0149         NDRX_LOG(log_debug, "Re-use existing mqd=%p", in_qd);
0150         qd = in_qd;
0151     }
0152     else if ((mqd_t)EXFAIL==(qd = ndrx_mq_open_at(q_str, 
0153             O_RDWR|O_NONBLOCK,S_IWUSR | S_IRUSR, NULL)))
0154     {
0155         NDRX_LOG(log_error, "Failed to open queue: [%s] err: %s",
0156                                         q_str, strerror(errno));
0157         ret=EXFAIL;
0158         goto out;
0159 
0160     }
0161     
0162     /* for System V queue is unlinked as part of the sanity checks
0163      * also System V queues cannot be unlinked while other processes are connected
0164      * in such scenario they will. For SVAPOLL do this after messages are processed
0165      * hecause that will make qid as invalid arg.
0166      * do this for posix early, because this means that there is less chance
0167      * for race condition to occurr, that someone got the queue and is sending
0168      * and we are removing.
0169      */
0170 #if !defined(EX_USE_SYSVQ) && !defined(EX_USE_SVAPOLL)
0171     /* Unlink the queue, the actual queue will live out through next session!
0172      * i.e. all users should close it to dispose it! As by manpage! */
0173     if (EXSUCCEED!=ndrx_mq_unlink(q_str))
0174     {
0175         NDRX_LOG(log_error, "Failed to unlink q [%s]: %s",
0176                 q_str, strerror(errno));
0177     }
0178 #endif
0179 
0180     /* Read all messages from Q & reply with dummy/FAIL stuff back! */
0181     while ((len=ndrx_mq_receive (qd, msg_buf, msg_buf_len, &prio)) > 0)
0182     {
0183         NDRX_LOG(log_warn, "Got message, size: %d", len);
0184         gen_command = (tp_command_generic_t *)msg_buf;
0185         
0186         /* Not sure how this can affect ATMI_COMMAND_CONNECT or ATMI_COMMAND_CONNRPLY
0187          * Time will show this for us!
0188          */
0189         if (ATMI_COMMAND_TPCALL==gen_command->command_id)
0190         {
0191             tp_command_call_t *tp_call = (tp_command_call_t *)gen_command;
0192             
0193             /* Bug #425: Check is process wait for reply? */
0194             reply_with_failure(TPNOBLOCK, tp_call, NULL, NULL, TPENOENT);
0195             
0196         }
0197         else
0198         {
0199             NDRX_LOG(log_warn, "Skipping command: %d", gen_command->command_id);
0200         }
0201     }
0202 
0203     /* For SVAPOLL: we could move this to later stage after content zap
0204      * as svapoll uses systemv queues, and they become invalid after zap.
0205      * any processes waiting on this msg, shall get the error because
0206      * waiting on remove queue descriptor.
0207      */
0208 #ifdef EX_USE_SVAPOLL
0209     if (EXSUCCEED!=ndrx_mq_unlink(q_str))
0210     {
0211         NDRX_LOG(log_error, "Failed to unlink q [%s]: %s",
0212                 q_str, strerror(errno));
0213     }
0214 #endif
0215    
0216 
0217     NDRX_LOG(log_debug, "Done receive...");
0218     
0219 out:
0220     /* close only if we did open the queue */
0221     if ((mqd_t)EXFAIL==in_qd && (mqd_t)EXFAIL!=qd)
0222     {
0223         if (EXSUCCEED!=ndrx_mq_close(qd))
0224         {
0225             NDRX_LOG(log_warn, "Failed to close q: %d - %s", qd, 
0226                     strerror(errno));
0227         }
0228     }
0229 
0230     if (NULL!=msg_buf)
0231     {
0232         NDRX_SYSBUF_FREE(msg_buf);
0233     }
0234 
0235     NDRX_LOG(log_debug, "%s - return, ret = %d", fn, ret);
0236 
0237     return ret;
0238     
0239 }
0240 
0241 /* vim: set ts=4 sw=4 et smartindent: */