![]() |
|
|||
0001 /** 0002 * @brief System V specifics for 0003 * 0004 * @file sanity_sysv.c 0005 */ 0006 /* ----------------------------------------------------------------------------- 0007 * Enduro/X Middleware Platform for Distributed Transaction Processing 0008 * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved. 0009 * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved. 0010 * This software is released under one of the following licenses: 0011 * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use. 0012 * See LICENSE file for full text. 0013 * ----------------------------------------------------------------------------- 0014 * AGPL license: 0015 * 0016 * This program is free software; you can redistribute it and/or modify it under 0017 * the terms of the GNU Affero General Public License, version 3 as published 0018 * by the Free Software Foundation; 0019 * 0020 * This program is distributed in the hope that it will be useful, but WITHOUT ANY 0021 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 0022 * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3 0023 * for more details. 0024 * 0025 * You should have received a copy of the GNU Affero General Public License along 0026 * with this program; if not, write to the Free Software Foundation, Inc., 0027 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 0028 * 0029 * ----------------------------------------------------------------------------- 0030 * A commercial use license is available from Mavimax, Ltd 0031 * contact@mavimax.com 0032 * ----------------------------------------------------------------------------- 0033 */ 0034 #include <string.h> 0035 #include <stdio.h> 0036 #include <stdlib.h> 0037 #include <errno.h> 0038 #include <memory.h> 0039 #include <sys/types.h> 0040 #include <dirent.h> 0041 #include <sys/stat.h> 0042 #include <utlist.h> 0043 #include <fcntl.h> 0044 0045 #include <ndrstandard.h> 0046 #include <ndrxd.h> 0047 #include <atmi_int.h> 0048 #include <nstopwatch.h> 0049 0050 #include <ndebug.h> 0051 #include <cmd_processor.h> 0052 #include <signal.h> 0053 #include <bridge_int.h> 0054 #include <atmi_shm.h> 0055 #include <userlog.h> 0056 #include <sys_unix.h> 0057 #include <sys_svq.h> 0058 #include <fcntl.h> 0059 0060 /*---------------------------Externs------------------------------------*/ 0061 /*---------------------------Macros-------------------------------------*/ 0062 /*---------------------------Enums--------------------------------------*/ 0063 /*---------------------------Typedefs-----------------------------------*/ 0064 /*---------------------------Globals------------------------------------*/ 0065 /*---------------------------Statics------------------------------------*/ 0066 /*---------------------------Prototypes---------------------------------*/ 0067 0068 /** 0069 * Remove any pending calls to request address. 0070 * Basically we construct here mqd_t and let common function to flush the calls. 0071 * The common function doesn't do any lockings, thus this approach is fine. 0072 * @param qid queue id 0073 * @param qstr queue string 0074 * @return EXSUCCEED/EXFAIL 0075 */ 0076 exprivate int flush_rqaddr(int qid, char *qstr) 0077 { 0078 int ret = EXSUCCEED; 0079 mqd_t mqd = NULL; 0080 int err; 0081 0082 mqd = NDRX_CALLOC(1, sizeof(struct ndrx_svq_info)); 0083 0084 if (NULL==mqd) 0085 { 0086 err = errno; 0087 0088 NDRX_LOG(log_error, "Failed to malloc %d bytes", 0089 (int)sizeof(struct ndrx_svq_info)); 0090 userlog("Failed to malloc %d bytes", 0091 (int)sizeof(struct ndrx_svq_info)); 0092 EXFAIL_OUT(ret); 0093 } 0094 0095 mqd->qid = qid; 0096 NDRX_STRCPY_SAFE(mqd->qstr, qstr); 0097 0098 /* set attr as non blocked */ 0099 mqd->attr.mq_flags |= O_NONBLOCK; 0100 0101 /* lets flush the queue now. */ 0102 if (EXSUCCEED!=remove_service_q(NULL, EXFAIL, mqd, qstr)) 0103 { 0104 NDRX_LOG(log_error, "Failed to flush [%s]/%d", qstr, qid); 0105 EXFAIL_OUT(ret); 0106 } 0107 out: 0108 0109 if (NULL!=mqd) 0110 { 0111 NDRX_FREE(mqd); 0112 } 0113 return ret; 0114 } 0115 0116 /** 0117 * System V sanity checks. 0118 * Includes following steps: 0119 * - get a copy of SV5 queue maps 0120 * - scan the shared memory of services, and mark the used sv5 qids. 0121 * - then scan the list of used qids for the request addresses 0122 * - remove any qid, that is not present in shm (the removal shall be done 0123 * in sv5 library with the write lock present and checking the ctime again 0124 * so that we have a real sync). Check the service rqaddr by NDRX_SVQ_MAP_RQADDR 0125 * 0126 * Well if we are about to remove stale request addresses, we could report them 0127 * from the server processes. And thus locate if none of available server processes 0128 * belongs to request address, then queue is unlinked. This will protect us from 0129 * unlinking queues to which working zero service servers are located, like 0130 * tpbridge... 0131 * @param finalchk do not use TTL for non service linked request address removal 0132 * 0133 * @return SUCCEED/FAIL 0134 */ 0135 expublic int do_sanity_check_sysv(int finalchk) 0136 { 0137 int ret=EXSUCCEED; 0138 ndrx_svq_status_t *svq = NULL; 0139 int len; 0140 int reslen; 0141 int i, j; 0142 int have_value_3; 0143 int pos_3; 0144 bridgedef_svcs_t *cur, *tmp; 0145 ndrx_shm_resid_t *srvlist = NULL; 0146 pm_node_t *p_pm; 0147 int last; 0148 string_hash_t *strh=NULL, *strh_el = NULL; 0149 0150 NDRX_LOG(log_debug, "Into System V sanity checks, finalchk: %d", finalchk); 0151 0152 /* Get the list of queues 0153 * if no ttl, then give a -1 which will make all queues scheduled for 0154 * removal 0155 */ 0156 svq = ndrx_svqshm_statusget(&reslen, (finalchk?-1:G_app_config->rqaddrttl)); 0157 0158 if (NULL==svq) 0159 { 0160 NDRX_LOG(log_error, "Failed to get System V shared memory status!"); 0161 userlog("Failed to get System V shared memory status!"); 0162 EXFAIL_OUT(ret); 0163 } 0164 0165 DL_FOREACH(G_process_model, p_pm) 0166 { 0167 /* Find live count of p_pm by resid... 0168 * maybe do this once? 0169 * if we are stopping... assume the RQ addr is still ok to avoid false 0170 * warnings. 0171 */ 0172 if (NDRXD_PM_RUNNING_OK==p_pm->state || NDRXD_PM_STOPPING == p_pm->state) 0173 { 0174 if (NULL==ndrx_string_hash_add_cnt(&strh, p_pm->rqaddress)) 0175 { 0176 NDRX_LOG(log_error, "Failed to add rqaddr [%s] to hashmap", 0177 p_pm->rqaddress); 0178 EXFAIL_OUT(ret); 0179 } 0180 } 0181 } 0182 0183 /* Now scan the used services shared memory and updated the 0184 * status copy accordingly 0185 * WELL! We must loop over the local NDRXD list of the services 0186 * and then update the status. Then we can avoid the locking of 0187 * the shared memory. 0188 */ 0189 0190 /* We assume shm is OK! */ 0191 0192 NDRX_LOG(log_debug, "Marking resources against services, reslen: %d", 0193 reslen); 0194 0195 EXHASH_ITER(hh, G_bridge_svc_hash, cur, tmp) 0196 { 0197 if (EXSUCCEED==ndrx_shm_get_srvs(cur->svc_nm, &srvlist, &len)) 0198 { 0199 NDRX_LOG(log_debug, "Checking service [%s]", cur->svc_nm); 0200 0201 /* Check the cluster nodes too, so if it is in network 0202 * then no need to unlink... 0203 */ 0204 for (i=0; i<len; i++) 0205 { 0206 ndrx_svqshm_get_status(svq, srvlist[i].resid, &pos_3, &have_value_3); 0207 0208 if (have_value_3) 0209 { 0210 int used_cnt = 0; 0211 0212 NDRX_LOG(log_debug, "Service [%s] have resource %d at idx %d", 0213 cur->svc_nm, srvlist[i].resid, i); 0214 svq[pos_3].flags |= NDRX_SVQ_MAP_HAVESVC; 0215 0216 0217 if (svq[pos_3].flags & NDRX_SVQ_MAP_SCHEDRM) 0218 { 0219 /* validate the count against the live servers processes, 0220 * if we found the real count of RQADDR is too short, 0221 * we must uninstall that number of services. 0222 * Note that Q has TTL already expired, thus all srvinfos 0223 * shall be already seen by ndrxd. 0224 * So can the PM... 0225 */ 0226 0227 strh_el = ndrx_string_hash_get(strh, svq[pos_3].qstr); 0228 0229 if (NULL!=strh_el) 0230 { 0231 used_cnt = strh_el->cnt; 0232 } 0233 0234 if (srvlist[i].cnt!=used_cnt) 0235 { 0236 NDRX_LOG(log_error, "Service [%s] rqaddr [%s] resource [%d] cnt=%d actual=%d", 0237 cur->svc_nm, svq[pos_3].qstr, srvlist[i].resid, srvlist[i].cnt, used_cnt); 0238 userlog("Service [%s] rqaddr [%s] resource [%d] cnt=%d actual=%d", 0239 cur->svc_nm, svq[pos_3].qstr, srvlist[i].resid, srvlist[i].cnt, used_cnt); 0240 } 0241 0242 for (j=0; j<(srvlist[i].cnt-used_cnt); j++) 0243 { 0244 ndrxd_shm_uninstall_svc(cur->svc_nm, &last, srvlist[i].resid); 0245 } 0246 } 0247 0248 } 0249 else 0250 { 0251 NDRX_LOG(log_error, "!!! Service [%s] have NO resource %d at idx %d", 0252 cur->svc_nm, srvlist[i].resid, i); 0253 0254 /* Shouldn't we housekeep the service as no Qs are available 0255 * for serving... 0256 * Thing #2: Seems like linux does not randomize msgid identifiers 0257 * thus it is possible here, that if server for whom we are removing 0258 * the identifier is booting in background and at this moment is installing 0259 * the service queues + services, then there is race condition on this 0260 * that here we actually remove the service. 0261 * Probably we could lock the queues and check status again? 0262 * and if that finds out that Q is missing, then with locked queues 0263 * we continue to remove the service. We can use read locks here on SystemV 0264 * but still needs to think about the locking, as then all the 0265 * else were of the SystemV queues any removal and decision 0266 * must be made in the locks otherwise we can remove something 0267 * created in racy manner. 0268 * Also... it look like on Linux msgid are reused very quickly. 0269 */ 0270 for (j=0; j<srvlist[i].cnt; j++) 0271 { 0272 ndrxd_shm_uninstall_svc(cur->svc_nm, &last, srvlist[i].resid); 0273 } 0274 } 0275 0276 } 0277 } /* local servs */ 0278 0279 if (NULL!=srvlist) 0280 { 0281 NDRX_FREE(srvlist); 0282 srvlist = NULL; 0283 } 0284 } 0285 0286 /* Scan for queues which are not any more is service list, 0287 * the queue was service, and time have expired for TTL, thus such queues 0288 * are subject for unlinking... 0289 * perform that in sync way... 0290 */ 0291 NDRX_LOG(log_debug, "Flush RQADDR queues without services and TTL expired."); 0292 for (i=0; i<reslen; i++) 0293 { 0294 int cont = EXFALSE; 0295 0296 /* 0297 NDRX_LOG(log_debug, "DEBUG! %d = ISUSED= %d WASUSED=%d EXPIRED=%d SCHEDRM=%d RQADDR=%d HAVESVC=%d [%s]/%d", 0298 i, 0299 svq[i].flags & NDRX_SVQ_MAP_ISUSED, 0300 svq[i].flags & NDRX_SVQ_MAP_WASUSED, 0301 svq[i].flags & NDRX_SVQ_MAP_EXPIRED, 0302 svq[i].flags & NDRX_SVQ_MAP_SCHEDRM, 0303 svq[i].flags & NDRX_SVQ_MAP_RQADDR, 0304 svq[i].flags & NDRX_SVQ_MAP_HAVESVC, 0305 svq[i].qstr, svq[i].qid); 0306 */ 0307 0308 if ((svq[i].flags & NDRX_SVQ_MAP_RQADDR) 0309 && !(svq[i].flags & NDRX_SVQ_MAP_HAVESVC) 0310 && (svq[i].flags & NDRX_SVQ_MAP_SCHEDRM)) 0311 { 0312 0313 /* Check process model, to see if any active server have this 0314 * request address 0315 */ 0316 DL_FOREACH(G_process_model, p_pm) 0317 { 0318 if (PM_RUNNING(p_pm->state) 0319 && 0==strcmp(p_pm->rqaddress, svq[i].qstr)) 0320 { 0321 NDRX_LOG(log_debug, "Server [%s]/%d is using rqddr [%s] - chk next", 0322 p_pm->binary_name, p_pm->srvid, svq[i].qstr); 0323 cont = EXTRUE; 0324 break; 0325 } 0326 } 0327 0328 if (cont) 0329 { 0330 continue; 0331 } 0332 0333 NDRX_LOG(log_info, "qid %d is subject for delete ttl %d qstr=[%s]", 0334 svq[i].qid, G_app_config->rqaddrttl, svq[i].qstr); 0335 0336 /* well time checking & flushing we shall do here 0337 * due to locking issues... not the way as bellow described... 0338 * There shall be no new message in RQADDR due to stale servers 0339 */ 0340 if (EXSUCCEED!=flush_rqaddr(svq[i].qid, svq[i].qstr)) 0341 { 0342 NDRX_LOG(log_error, "Failed to flush RQADDR [%s]/%d", 0343 svq[i].qstr, svq[i].qid); 0344 userlog("Failed to flush RQADDR [%s]/%d", 0345 svq[i].qstr, svq[i].qid); 0346 } 0347 0348 /* Well at this point we shall 0349 * remove call expublic int remove_service_q(char *svc, short srvid, 0350 * mqd_t in_qd, char *in_qstr)!!! 0351 * because we need to flush the queue of messages... 0352 * only here we have a issue with lockings 0353 * we will be in write mode to MAPs, but in mean time we want to 0354 * perform read ops to the SHM... 0355 * Thus probably we need some globals in svqshm that indicate 0356 * that we have already exclusive lock! 0357 * 0358 * But as remove_service_q is not using much of the systemv maps 0359 * processing, then we could just simple callback from ndrx_svqshm_ctl 0360 * with qid and queue string. then callback would build simple 0361 * mqd_t and pass it to remove_service_q for message zapping. 0362 */ 0363 if (EXSUCCEED!=ndrx_svqshm_ctl(NULL, svq[i].qid, 0364 IPC_RMID, EXFAIL, NULL)) 0365 { 0366 NDRX_LOG(log_error, "Failed to unlink qid %d", svq[i].qid); 0367 EXFAIL_OUT(ret); 0368 } 0369 } 0370 } 0371 0372 out: 0373 0374 if (NULL!=svq) 0375 { 0376 NDRX_FREE(svq); 0377 } 0378 0379 if (NULL!=srvlist) 0380 { 0381 NDRX_FREE(srvlist); 0382 } 0383 0384 ndrx_string_hash_free(strh); 0385 0386 return ret; 0387 } 0388 0389 /** 0390 * Perform final checks on exit - remove all service queues... 0391 * @return 0392 */ 0393 expublic int ndrxd_sysv_finally(void) 0394 { 0395 int ret = EXSUCCEED; 0396 0397 ret = do_sanity_check_sysv(EXTRUE); 0398 0399 return ret; 0400 } 0401 0402 /* vim: set ts=4 sw=4 et smartindent: */
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |
![]() ![]() |