Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Shared memory for System V queues
0003  *   we need following ops here:
0004  *   - lookup for queue id, if specified create, then create on
0005  *   -
0006  *
0007  * @file sys_svqshm.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 
0038 /*---------------------------Includes-----------------------------------*/
0039 
0040 #include <stdlib.h>
0041 #include <stdio.h>
0042 #include <fcntl.h>           /* For O_* constants */
0043 #include <ndrstandard.h>
0044 
0045 #include <sys/types.h>
0046 #include <sys/ipc.h>
0047 #include <sys/msg.h>
0048 
0049 #include <nstopwatch.h>
0050 #include <nstd_tls.h>
0051 #include <exhash.h>
0052 #include <ndebug.h>
0053 #include <sys_svq.h>
0054 #include "nstd_shm.h"
0055 #include <sys_svq.h>
0056 #include <ndrx_config.h>
0057 #include <lcfint.h>
0058 
0059 /*---------------------------Externs------------------------------------*/
0060 /*---------------------------Macros-------------------------------------*/
0061 
0062 /**
0063  * Perform init of the shared resources
0064  */
0065 #define INIT_ENTRY(X) \
0066 if (!ndrx_G_svqshm_init) \
0067     {\
0068         MUTEX_LOCK_V(ndrx_G_svqshm_init_lock);\
0069         \
0070         if (!ndrx_G_svqshm_init)\
0071         {\
0072             X = ndrx_svqshm_init(EXFALSE);\
0073         }\
0074         \
0075         MUTEX_UNLOCK_V(ndrx_G_svqshm_init_lock);\
0076         \
0077         if (EXFAIL==X)\
0078         {\
0079             NDRX_LOG(log_error, "Failed to create/attach System V Queues "\
0080                     "mapping shared memory blocks!");\
0081             EXFAIL_OUT(X);\
0082         }\
0083     }
0084 
0085 /**
0086  * Default time for queue to live after removal.
0087  * After this time in seconds, ndrxd will zap the queue. This is needed
0088  * for reason so that if any process in background re-opens the service on
0089  * some request address and yet are not reported (in progress) the list
0090  * of the services, the ctime is changed of the ipc q, and thus ndrxd will
0091  * not attempt to remove it. After that ndrxd receives the message and
0092  * now queue will not be removed, because of its presence in service lists
0093  */
0094 #define RM_TTL_DLFT             10
0095 
0096 #define SHM_ENT_NONE            0       /**< Entry not used */
0097 #define SHM_ENT_MATCH           1       /**< Entry matched  */
0098 #define SHM_ENT_OLD             2       /**< Entry was used */
0099 
0100 /*---------------------------Enums--------------------------------------*/
0101 /*---------------------------Typedefs-----------------------------------*/
0102 /*---------------------------Globals------------------------------------*/
0103 
0104 expublic volatile int ndrx_G_svqshm_init = EXFALSE;
0105 /* have some lock for this argument */
0106 exprivate MUTEX_LOCKDECL(ndrx_G_svqshm_init_lock);
0107 
0108 exprivate ndrx_shm_t M_map_p2s = {.fd=0, .path=""};   /**< Posix to System V mapping       */
0109 exprivate ndrx_shm_t M_map_s2p = {.fd=0, .path=""};   /**< System V to Posix mapping       */
0110 exprivate ndrx_sem_t M_map_sem = {.semid=0};/**< RW semaphore for SHM protection */
0111 
0112 /* Also we need some array of semaphores for RW locking */
0113 
0114 /*---------------------------Statics------------------------------------*/
0115 /*---------------------------Prototypes---------------------------------*/
0116 
0117 /**
0118  * Remove shared mem resources
0119  * @return EXSUCCEED/EXFAIL
0120  */
0121 expublic int ndrx_svqshm_down(int force)
0122 {
0123     int ret = EXSUCCEED;
0124     int i;
0125     ndrx_svq_map_t *svq, *el;
0126     /* have some init first, so that we attach existing resources
0127      * before killing them
0128      */
0129     INIT_ENTRY(ret);
0130     
0131 #ifdef EX_USE_SYSVQ
0132     /* Terminate polling threads... if any... 
0133     ndrx_atfork_prepare();
0134     - in case if using system pthread_atfork(), we shall call the
0135     cleanup routines by our selves
0136     */
0137     ndrx_svqadmin_fork_prepare();
0138     ndrx_svq_fork_prepare();
0139     
0140 #if 0
0141     - currently nothing there...
0142     /* WARNING ! RESUME AS CHILD (FREE UP QUEUE DEL REF LOCKS) !!!! 
0143      * If in future we do some more with childs, then needs to think here
0144      * how to avoid that.
0145      */
0146     ndrx_atfork_child();
0147 #endif
0148 #endif
0149     
0150     if (force)
0151     {
0152         /* get write locks... */
0153         svq = (ndrx_svq_map_t *) M_map_p2s.mem;
0154 
0155         /* remove any queues left open...! */
0156         for (i=0; i<ndrx_G_libnstd_cfg.queuesmax; i++)
0157         {
0158             el = NDRX_SVQ_INDEX(svq, i);
0159 
0160             if (el->flags & NDRX_SVQ_MAP_ISUSED)
0161             {
0162                 NDRX_LOG(log_error, "DOWN: Removing QID %d (%s) - should not be present!", 
0163                         el->qid, el->qstr);
0164                 userlog("DOWN: Removing QID %d (%s) - should not be present!", 
0165                         el->qid, el->qstr);
0166                 if (EXSUCCEED!=msgctl(el->qid, IPC_RMID, NULL))
0167                 {
0168                     int err = errno;
0169                     NDRX_LOG(log_error, "got error when removing %d: %s - ignore", 
0170                          el->qid, strerror(err));
0171                     userlog("got error when removing %d: %s - ignore", 
0172                          el->qid, strerror(err));
0173                 }
0174             }
0175         } /* for */
0176     } /* force */
0177      
0178     if (EXSUCCEED!=ndrx_shm_close(&M_map_p2s))
0179     {
0180         ret = EXFAIL;
0181     }
0182     
0183     if (EXSUCCEED!=ndrx_shm_remove(&M_map_p2s))
0184     {
0185         ret = EXFAIL;
0186     }
0187     
0188     if (EXSUCCEED!=ndrx_shm_close(&M_map_s2p))
0189     {
0190         ret = EXFAIL;
0191     }
0192     
0193     if (EXSUCCEED!=ndrx_shm_remove(&M_map_s2p))
0194     {
0195         ret = EXFAIL;
0196     }
0197     
0198     if (EXSUCCEED!=ndrx_sem_close(&M_map_sem))
0199     {
0200         ret = EXFAIL;
0201     }
0202     
0203     if (EXSUCCEED!=ndrx_sem_remove(&M_map_sem, EXTRUE))
0204     {
0205         ret = EXFAIL;
0206     }
0207     
0208     ndrx_G_svqshm_init = EXFALSE;
0209     
0210 out:
0211     return ret;
0212 }
0213 
0214 /**
0215  * Initialize the shared memory blocks.
0216  * This assumes that basic environment is loaded. Because we need access to
0217  * max queues setting.
0218  * @param[in] attach_only attach only, do not output resources and do not start
0219  *  AUX thread
0220  * @return EXSUCCEED/EXFAIL
0221  */
0222 expublic int ndrx_svqshm_init(int attach_only)
0223 {
0224     int ret = EXSUCCEED;
0225     
0226     /* pull-in LCF init */
0227     NDRX_LOG(log_debug, "SystemV queue init...");
0228  
0229     
0230     memset(&M_map_p2s, 0, sizeof(M_map_p2s));
0231     memset(&M_map_s2p, 0, sizeof(M_map_s2p));
0232     
0233     
0234     /* check mandatory config presence loaded by LCF! */
0235     
0236     if (NULL==ndrx_G_libnstd_cfg.qprefix || EXEOS==ndrx_G_libnstd_cfg.qprefix[0])
0237     {
0238         NDRX_LOG(log_error, "%s is not set!", CONF_NDRX_QPREFIX);
0239         EXFAIL_OUT(ret);
0240     }
0241     
0242     if (!ndrx_G_libnstd_cfg.ipckey)
0243     {
0244         NDRX_LOG(log_error, "%s is not set!", CONF_NDRX_IPCKEY);
0245         EXFAIL_OUT(ret);
0246     }
0247     
0248     /* fill in shared memory details, path + size */
0249     
0250     M_map_p2s.fd = EXFAIL;
0251     M_map_p2s.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SHM_P2S_KEYOFSZ;
0252     
0253     snprintf(M_map_p2s.path, sizeof(M_map_p2s.path), NDRX_SHM_P2S, ndrx_G_libnstd_cfg.qprefix);
0254     M_map_p2s.size = sizeof(ndrx_svq_map_t)*ndrx_G_libnstd_cfg.queuesmax;
0255     
0256     M_map_s2p.fd = EXFAIL;
0257     M_map_s2p.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SHM_S2P_KEYOFSZ;
0258     
0259     snprintf(M_map_s2p.path, sizeof(M_map_s2p.path), NDRX_SHM_S2P, ndrx_G_libnstd_cfg.qprefix);
0260     M_map_s2p.size = sizeof(ndrx_svq_map_t)*ndrx_G_libnstd_cfg.queuesmax;
0261     
0262     
0263     if (attach_only)
0264     {
0265         if (EXSUCCEED!=ndrx_shm_attach(&M_map_p2s))
0266         {
0267             NDRX_LOG(log_error, "Failed to oattach shm [%s] - System V Queues cannot work",
0268                         M_map_p2s.path);
0269             EXFAIL_OUT(ret);
0270         }
0271     }
0272     else if (EXSUCCEED!=ndrx_shm_open(&M_map_p2s, EXTRUE))
0273     {
0274         NDRX_LOG(log_error, "Failed to open shm [%s] - System V Queues cannot work",
0275                     M_map_p2s.path);
0276         EXFAIL_OUT(ret);
0277     }
0278     
0279     if (attach_only)
0280     {
0281         if (EXSUCCEED!=ndrx_shm_attach(&M_map_s2p))
0282         {
0283             NDRX_LOG(log_error, "Failed to attach shm [%s] - System V Queues cannot work",
0284                         M_map_s2p.path);
0285             EXFAIL_OUT(ret);
0286         }
0287     }
0288     else if (EXSUCCEED!=ndrx_shm_open(&M_map_s2p, EXTRUE))
0289     {
0290         NDRX_LOG(log_error, "Failed to open shm [%s] - System V Queues cannot work",
0291                     M_map_s2p.path);
0292         EXFAIL_OUT(ret);
0293     }
0294     
0295     NDRX_LOG(log_debug, "M_map_p2s.mem=%p M_map_s2p=%p", 
0296             M_map_p2s.mem, M_map_s2p.mem);
0297     
0298     memset(&M_map_sem, 0, sizeof(M_map_sem));
0299     
0300     /* Service queue ops */
0301     M_map_sem.key = ndrx_G_libnstd_cfg.ipckey + NDRX_SEM_SV5LOCKS;
0302     
0303     /*
0304      * Currently using single semaphore.
0305      * But from balancing when searching entries, we could use multiple sems.. 
0306      * to protect both shared mems...
0307      * If we use different two sems one for p2s and another for s2p we could
0308      * easily run into deadlock. Semop allows atomically work with multiple
0309      * semaphores, so maybe this can be used to increase performance.
0310      */
0311     M_map_sem.nrsems = 1;
0312     M_map_sem.maxreaders = ndrx_G_libnstd_cfg.svqreadersmax;
0313     
0314     NDRX_LOG(log_debug, "Using service semaphore key: %d max readers: %d", 
0315             M_map_sem.key, ndrx_G_libnstd_cfg.svqreadersmax);
0316     
0317     /* OK, either create or attach... */
0318     if (attach_only)
0319     {
0320         if (EXSUCCEED!=ndrx_sem_attach(&M_map_sem))
0321         {
0322             NDRX_LOG(log_error, "Failed to attach semaphore for System V queue "
0323                     "map shared mem");
0324             EXFAIL_OUT(ret);
0325         }
0326     }
0327     else if (EXSUCCEED!=ndrx_sem_open(&M_map_sem, EXTRUE))
0328     {
0329         NDRX_LOG(log_error, "Failed to open semaphore for System V queue "
0330                 "map shared mem");
0331         userlog("Failed to open semaphore for System V queue "
0332                 "map shared mem");
0333         EXFAIL_OUT(ret);
0334     }
0335     
0336 #ifdef EX_USE_SYSVQ
0337     /* init the support thread */
0338     if (!attach_only && EXSUCCEED!=ndrx_svq_event_init())
0339     {
0340         NDRX_LOG(log_error, "Failed to init System V queue monitoring thread!");
0341         userlog("Failed to init System V queue monitoring thread!");
0342         EXFAIL_OUT(ret);
0343     }
0344 #endif
0345             
0346     ndrx_G_svqshm_init = EXTRUE;
0347 out:
0348     return ret;
0349 }
0350 
0351 /**
0352  * This attempts to attach to System V resources, does not spawn AUX threads.
0353  * If you plan to run some other queue ops, then needs to close the shm
0354  * and let standard init to step in
0355  * @return EXFAIL - failed, EXSUCCEED - attached to already existing resources
0356  *  EXTRUE - attached only.
0357  */
0358 expublic int ndrx_svqshm_attach(void)
0359 {
0360     int ret = EXSUCCEED;
0361     
0362     if (ndrx_G_svqshm_init)
0363     {
0364         NDRX_LOG(log_debug, "Already System V resources open");
0365         goto out;
0366     }
0367     
0368     if (EXSUCCEED!=ndrx_svqshm_init(EXTRUE))
0369     {
0370         NDRX_LOG(log_error, "Failed to attach to System V resources");
0371         EXFAIL_OUT(ret);
0372     }
0373     
0374     NDRX_LOG(log_debug, "Attached to shm/sem OK");
0375     
0376     ret = EXTRUE;
0377     
0378 out:
0379     return ret;
0380 }
0381 
0382 /**
0383  * Detach (with attach only was done) from System V resources
0384  */
0385 expublic void ndrx_svqshm_detach(void)
0386 {
0387     ndrx_shm_close(&M_map_p2s);
0388     ndrx_shm_close(&M_map_s2p);
0389     ndrx_sem_close(&M_map_sem);
0390     
0391     ndrx_G_svqshm_init = EXFALSE;
0392 }
0393 
0394 /**
0395  * Get System V shared resources
0396  * @param map_p2s Posix to System V mapping table
0397  * @param map_s2p System V to Posix mapping table
0398  * @param map_sem Semaphore locks
0399  * @return EXFALSE - not attached to SHM, EXTRUE - attached to shm
0400  */
0401 expublic int ndrx_svqshm_shmres_get(ndrx_shm_t **map_p2s, ndrx_shm_t **map_s2p, 
0402         ndrx_sem_t **map_sem, int *queuesmax)
0403 {
0404 
0405     *map_p2s = &M_map_p2s;
0406     *map_s2p = &M_map_s2p;
0407     *map_sem = &M_map_sem;
0408     *queuesmax = ndrx_G_libnstd_cfg.queuesmax;
0409     
0410     return ndrx_G_svqshm_init;
0411 }
0412 
0413 /**
0414  * Hash the string key
0415  * @param conf hash config
0416  * @param key_get key value
0417  * @param key_len not used
0418  * @return slot index
0419  */
0420 exprivate int qstr_key_hash(ndrx_lh_config_t *conf, void *key_get, size_t key_len)
0421 {
0422     return ndrx_hash_fn(key_get) % conf->elmmax;
0423 }
0424 
0425 /**
0426  * Generate debug for key output
0427  * @param conf hash config
0428  * @param key_get key value
0429  * @param key_len not used
0430  * @param dbg_out debug output buffer
0431  * @param dbg_len debug output buffer size
0432  */
0433 exprivate void qstr_key_debug(ndrx_lh_config_t *conf, void *key_get, size_t key_len, 
0434     char *dbg_out, size_t dbg_len)
0435 {
0436     NDRX_STRCPY_SAFE_DST(dbg_out, key_get, dbg_len);
0437 }
0438 
0439 /**
0440  * Get value debug string
0441  * @param conf hash config
0442  * @param idx array index
0443  * @param dbg_out debug output
0444  * @param dbg_len debug output len
0445  */
0446 exprivate void val_debug(ndrx_lh_config_t *conf, int idx, char *dbg_out, size_t dbg_len)
0447 {
0448     snprintf(dbg_out, dbg_len, "%s/%d", 
0449             NDRX_SVQ_INDEX((*conf->memptr), idx)->qstr,
0450             NDRX_SVQ_INDEX((*conf->memptr), idx)->qid);
0451     
0452 }
0453 
0454 /**
0455  * Compare the hash element key
0456  * @param conf hash config
0457  * @param key_get key value
0458  * @param key_len key len
0459  * @param idx index at which to compare
0460  * @return 0 if equals
0461  */
0462 exprivate int qstr_compare(ndrx_lh_config_t *conf, void *key_get, size_t key_len, int idx)
0463 {
0464     return strcmp(NDRX_SVQ_INDEX((*conf->memptr), idx)->qstr, key_get);
0465 }
0466 
0467 /**
0468  * Get position, hash the pathname and find free space if creating
0469  * or just empty position, if not found..
0470  * @param pathname Posix queue string 
0471  * @param[out] pos position found suitable to succeed request
0472  * @param[out] have_value valid value is found? EXTRUE/EXFALSE.
0473  * @return EXTRUE -> found position/ EXFALSE - no position found
0474  */
0475 exprivate int position_get_qstr(char *pathname, int oflag, int *pos, 
0476         int *have_value)
0477 {
0478     static ndrx_lh_config_t conf;
0479     static int first = EXTRUE;
0480     
0481     if (first)
0482     {
0483         conf.elmmax = ndrx_G_libnstd_cfg.queuesmax;
0484         conf.elmsz = sizeof(ndrx_svq_map_t);
0485         conf.flags_offset = EXOFFSET(ndrx_svq_map_t, flags);
0486         conf.memptr = (void **)&(M_map_p2s.mem);
0487         conf.p_key_hash=&qstr_key_hash;
0488         conf.p_key_debug=&qstr_key_debug;
0489         conf.p_val_debug=&val_debug;
0490         conf.p_compare=&qstr_compare;
0491         first = EXFALSE;
0492     }
0493     
0494     return ndrx_lh_position_get(&conf, pathname, 0, oflag, pos, have_value, "qstr");
0495 }
0496 
0497 /**
0498  * Linear hashing hash function
0499  * @param conf hash config
0500  * @param key_get key data
0501  * @param key_len key len
0502  * @return hashed value (some number) within the memory range
0503  */
0504 exprivate int qid_key_hash(ndrx_lh_config_t *conf, void *key_get, size_t key_len)
0505 {
0506     return *((int *)key_get) % conf->elmmax;
0507 }
0508 
0509 /**
0510  * Key debug output generator
0511  * @param conf hash config
0512  * @param key_get key value
0513  * @param key_len key len/not used
0514  * @param dbg_out debug output buffer
0515  * @param dbg_len debug buffer len
0516  */
0517 exprivate void qid_key_debug(ndrx_lh_config_t *conf, void *key_get, size_t key_len, 
0518     char *dbg_out, size_t dbg_len)
0519 {
0520     snprintf(dbg_out, dbg_len, "%d", *((int *)key_get));
0521 }
0522 
0523 /**
0524  * Hash key compare
0525  * @param conf hash config
0526  * @param key_get key value
0527  * @param key_len key len, not used
0528  * @param idx index at which to compare
0529  * @return 0 - equals, other not equals
0530  */
0531 exprivate int qid_compare(ndrx_lh_config_t *conf, void *key_get, size_t key_len, int idx)
0532 {
0533     /* return strcmp(NDRX_SVQ_INDEX((*conf->memptr), idx)->qstr, key_get); */
0534     if (NDRX_SVQ_INDEX((*conf->memptr), idx)->qid == *((int *)key_get))
0535     {
0536         return EXSUCCEED;
0537     }
0538     else
0539     {
0540         return EXFAIL;
0541     }
0542 }
0543 
0544 /**
0545  * Get position queue id record
0546  * @param qid queue id
0547  * @param[out] pos position found suitable to succeed request
0548  * @param[out] have_value valid value is found? EXTRUE/EXFALSE.
0549  * @return EXTRUE -> found position/ EXFALSE - no position found
0550  */
0551 exprivate int position_get_qid(int qid, int oflag, int *pos, 
0552         int *have_value)
0553 {
0554     static ndrx_lh_config_t conf;
0555     static int first = EXTRUE;
0556     
0557     if (first)
0558     {
0559         conf.elmmax = ndrx_G_libnstd_cfg.queuesmax;
0560         conf.elmsz = sizeof(ndrx_svq_map_t);
0561         conf.flags_offset = EXOFFSET(ndrx_svq_map_t, flags);
0562         conf.memptr = (void **)&(M_map_s2p.mem);
0563         conf.p_key_hash=&qid_key_hash;
0564         conf.p_key_debug=&qid_key_debug;
0565         conf.p_val_debug=&val_debug;
0566         conf.p_compare=&qid_compare;
0567         first = EXFALSE;
0568     }
0569     
0570     return ndrx_lh_position_get(&conf, &qid, 0, oflag, pos, have_value, "qid");
0571 
0572 }
0573 
0574 
0575 /**
0576  * Get position queue id record
0577  * @param status status memory block
0578  * @param qid queue id
0579  * @param[out] pos position found suitable to succeed request
0580  * @param[out] have_value valid value is found? EXTRUE/EXFALSE.
0581  * @return EXTRUE -> found position/ EXFALSE - no position found
0582  */
0583 expublic int ndrx_svqshm_get_status(ndrx_svq_status_t *status, 
0584         int qid, int *pos, int *have_value)
0585 {
0586     int ret=SHM_ENT_NONE;
0587     int try = qid % ndrx_G_libnstd_cfg.queuesmax;
0588     int start = try;
0589     int overflow = EXFALSE;
0590     int iterations = 0;
0591     ndrx_svq_status_t *el;
0592 
0593     *pos=EXFAIL;
0594     *have_value = EXFALSE;
0595     
0596     /*
0597      * So we loop over filled entries until we found empty one or
0598      * one which have been initialised by this service.
0599      *
0600      * So if there was overflow, then loop until the start item.
0601      */
0602     while ((NDRX_SVQ_STATIDX(status, try)->flags & NDRX_SVQ_MAP_WASUSED)
0603             && (!overflow || (overflow && try < start)))
0604     {
0605         
0606         el = NDRX_SVQ_STATIDX(status, try);
0607                 
0608         if (el->qid == qid)
0609         {
0610             *pos=try;
0611             
0612             if (el->flags & NDRX_SVQ_MAP_ISUSED)
0613             {
0614                 ret=SHM_ENT_MATCH;
0615             }
0616             else
0617             {
0618                 ret=SHM_ENT_OLD;
0619             }
0620             
0621             /* Bug #784  
0622             ret=EXTRUE;*/
0623             break;  /* <<< Break! */
0624         }
0625         
0626         try++;
0627         
0628         /* we loop over... 
0629          * Feature #139 mvitolin, 09/05/2017
0630          * Fix potential overflow issues at the border... of SHM...
0631          */
0632         if (try>=ndrx_G_libnstd_cfg.queuesmax)
0633         {
0634             try = 0;
0635             overflow=EXTRUE;
0636             NDRX_LOG(log_debug, "Overflow reached for search of [%d]", qid);
0637         }
0638         iterations++;
0639         
0640         NDRX_LOG(log_dump, "Trying %d for [%d]", try, qid);
0641     }
0642     switch (ret)
0643     {
0644         case SHM_ENT_OLD:
0645             *have_value = EXFALSE;
0646             ret = EXTRUE;   /* have position */
0647             break;
0648         case SHM_ENT_NONE:
0649             
0650             if (overflow)
0651             {
0652                 *have_value = EXFALSE;
0653                 ret = EXFALSE;   /* no position */
0654             }
0655             else
0656             {
0657                 *have_value = EXFALSE;
0658                 ret = EXTRUE;   /* have position */
0659             }
0660             
0661             break;
0662         case SHM_ENT_MATCH:
0663             *have_value = EXTRUE;
0664             ret = EXTRUE;   /* have position */
0665             break;
0666         default:
0667             
0668             NDRX_LOG(log_error, "!!! should not get here...");
0669             break;
0670     }
0671     
0672     *pos=try;
0673     NDRX_LOG(log_debug, "[%d] - result: %d, "
0674                         "iterations: %d, pos: %d, have_value: %d",
0675                          qid, ret, iterations, *pos, *have_value);
0676     return ret;
0677 }
0678 
0679 /**
0680  * Lookup qid
0681  * @param in_qid System V queue id
0682  * @param out_qstr output string
0683  * @param out_qstr_len output string length 
0684  * @return EXSUCCEED/EXFAIL
0685  */
0686 expublic int ndrx_svqshm_get_qid(int in_qid, char *out_qstr, int out_qstr_len)
0687 {
0688     int ret = EXSUCCEED;
0689     int found_2;
0690     int have_value_2;
0691     int pos_2;
0692     ndrx_svq_map_t *svq2;
0693     ndrx_svq_map_t *sm;      /* System V map        */
0694 
0695     INIT_ENTRY(ret);
0696     
0697     svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
0698 
0699     /* ###################### CRITICAL SECTION ############################### */
0700     if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
0701     {
0702         goto out;
0703     }
0704 
0705     found_2 = position_get_qid(in_qid, 0, &pos_2, &have_value_2);
0706     
0707     if (have_value_2)
0708     {
0709         sm = NDRX_SVQ_INDEX(svq2, pos_2);
0710         NDRX_STRCPY_SAFE_DST(out_qstr, sm->qstr, out_qstr_len);
0711     }
0712     else
0713     {
0714         ret=EXFAIL;
0715     }
0716     
0717     ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
0718     /* ###################### CRITICAL SECTION, END ########################## */
0719     
0720 out:
0721             
0722     return ret;
0723 }
0724 
0725 /**
0726  * Get queue from shared memory.
0727  * In case of O_CREAT + O_EXCL return EEXIST error!
0728  * @param qstr queue path
0729  * @param mode access mode/permissions
0730  * @param oflag are we creating a queue?
0731  * @param remove should we actually remove the queue?
0732  * @return resolve queue id
0733  */
0734 expublic int ndrx_svqshm_get(char *qstr, mode_t mode, int oflag)
0735 {
0736     int ret = EXSUCCEED;
0737     int qid;
0738     int found;
0739     int have_value;
0740     int pos;
0741     
0742     int found_2;
0743     int have_value_2;
0744     int pos_2;
0745     
0746     int msgflag;
0747     int err = 0;
0748     
0749     ndrx_svq_map_t *svq;
0750     ndrx_svq_map_t *svq2;
0751     
0752     ndrx_svq_map_t *pm;      /* Posix map           */
0753     ndrx_svq_map_t *sm;      /* System V map        */
0754     
0755     INIT_ENTRY(ret);
0756     
0757     svq = (ndrx_svq_map_t *) M_map_p2s.mem;
0758     svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
0759     
0760     /* Calculate first position of queue lookup */
0761     
0762     /* Read lock, only if attempting to open existing queue
0763      * For creating queues, we need to count the actual number
0764      * of creates called. So that we could sync with ndrxd the zapping of Queues.
0765      */
0766     
0767     if (!(oflag & O_CREAT))
0768     {
0769         /* ###################### CRITICAL SECTION ############################### */
0770         if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
0771         {
0772             goto out;
0773         }
0774 
0775         found = position_get_qstr(qstr, oflag, &pos, &have_value);
0776         
0777         if (have_value)
0778         {
0779             pm = NDRX_SVQ_INDEX(svq, pos);
0780             qid = pm->qid;
0781         }
0782 
0783         ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
0784         /* ###################### CRITICAL SECTION, END ########################## */
0785     
0786         if (have_value)
0787         {
0788             if (oflag & (O_CREAT | O_EXCL))
0789             {
0790                 NDRX_LOG(log_error, "Queue [%s] was requested with O_CREAT | O_EXCL, but "
0791                         "it already exists at position with qid %d", qstr, ret);
0792                 err = EEXIST;
0793                 EXFAIL_OUT(ret);
0794             }
0795             else
0796             {
0797                 NDRX_LOG(log_debug, "Queue [%s] mapped to qid %d", qstr, qid);
0798             }
0799 
0800             /* finish with it */
0801             goto out;
0802         }
0803         else
0804         {
0805             /* queue not found and we are in read mode... */
0806             NDRX_LOG(log_debug, "Queue not found for: [%s]", qstr);
0807             err = ENOENT;
0808             EXFAIL_OUT(ret);
0809         }
0810     }
0811     
0812     /* queue missing, release read lock, get write lock */
0813     NDRX_LOG(log_info, "[%s] queue not registered or write requested %d - Writ try...", 
0814             qstr, oflag);
0815     
0816     /* ###################### CRITICAL SECTION ############################### */
0817     if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE))
0818     {
0819         goto out;
0820     }
0821     
0822     found = position_get_qstr(qstr, oflag, &pos, &have_value);
0823     
0824     /* check that we have found! */
0825     
0826     if (!found)
0827     {
0828         NDRX_LOG(log_error, "Location not found for [%s] - memory full?", qstr);
0829         userlog("Location not found for [%s] - memory full?", qstr);
0830         err = ENOMEM;
0831         EXFAIL_OUT(ret);
0832     }
0833     
0834     pm = NDRX_SVQ_INDEX(svq, pos);
0835     
0836     /* while we were locked, somebody may added such queue already...! */
0837 
0838     if (have_value)
0839     {
0840         qid = pm->qid;
0841         
0842         if ( (oflag & O_CREAT)  &&  (oflag & O_EXCL))
0843         {
0844             /* ###################### CRITICAL SECTION, END ########################## */
0845             ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0846             NDRX_LOG(log_error, "Queue [%s] was requested with O_CREAT | O_EXCL, but "
0847                     "it already exists at position with qid %d", qstr, qid);
0848             err = EEXIST;
0849             EXFAIL_OUT(ret);
0850         }
0851         else
0852         {
0853             /* update the usage statistics */
0854             
0855             /* Lookup the second table... so that we can update
0856              * usage statistics
0857              */
0858             qid = pm->qid;
0859             
0860             found_2 = position_get_qid(qid, oflag, &pos_2, &have_value_2);
0861             sm = NDRX_SVQ_INDEX(svq2, pos_2);
0862             
0863             ndrx_stopwatch_reset(&(pm->ctime));
0864             sm->ctime = pm->ctime;
0865             
0866             ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0867             /* ###################### CRITICAL SECTION, END ########################## */
0868             
0869             NDRX_LOG(log_info, "Queue [%s] mapped to qid %d", 
0870                     qstr, qid);
0871         }
0872         /* finish with it */
0873         goto out;
0874     }
0875     
0876     /* open queue, install mappings in both tables */
0877     
0878     msgflag=0;
0879 
0880     if (oflag & O_CREAT)
0881     {
0882         msgflag|=IPC_CREAT;
0883     }
0884 
0885     if (oflag & O_EXCL)
0886     {
0887         msgflag|=IPC_EXCL;
0888     }
0889     
0890     /* extract only known flags.. */
0891     if (EXFAIL==(qid = msgget(IPC_PRIVATE, msgflag|mode)))
0892     {
0893         int err = errno;
0894         ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0895         /* ###################### CRITICAL SECTION, END ########################## */
0896         
0897         NDRX_LOG(log_error, "Failed msgget: %s for [%s]", strerror(err), qstr);
0898         userlog("Failed msgget: %s for [%s]", strerror(err), qstr);
0899         EXFAIL_OUT(ret);
0900     }
0901     
0902     /* write handlers off */
0903     pm->qid = qid;
0904     NDRX_STRCPY_SAFE(pm->qstr, qstr);
0905     pm->flags = (NDRX_SVQ_MAP_ISUSED | NDRX_SVQ_MAP_WASUSED);
0906     
0907     /* now locate the pid to string mapping... 
0908      * lookup the second table by new qid/ret
0909      */
0910     /* Lookup the second table... so that we can update
0911      * usage statistics
0912      */
0913     found_2 = position_get_qid(qid, oflag, &pos_2, &have_value_2);
0914     
0915     if (!found_2)
0916     {
0917         NDRX_LOG(log_error, "Location not found for qid [%d] - memory full?", qid);
0918         userlog("Location not found for qid [%d] - memory full?", qid);
0919         err = ENOMEM;
0920         EXFAIL_OUT(ret);
0921     }
0922     
0923     sm = NDRX_SVQ_INDEX(svq2, pos_2);
0924     
0925     sm->qid = qid;
0926     NDRX_STRCPY_SAFE( sm->qstr, qstr);
0927     sm->flags = (NDRX_SVQ_MAP_ISUSED | NDRX_SVQ_MAP_WASUSED);
0928     
0929     /* reset last create time... */
0930     ndrx_stopwatch_reset(&(pm->ctime));
0931     sm->ctime = pm->ctime;
0932     
0933     ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
0934     /* ###################### CRITICAL SECTION, END ########################## */
0935     
0936     NDRX_LOG(log_debug, "Open queue: [%s] to system v: [%d]", qstr, qid);
0937     
0938 out:
0939     
0940     if (EXSUCCEED!=ret)
0941     {
0942         errno = err;
0943         return EXFAIL;
0944     }
0945     else
0946     {
0947         return qid;
0948     }
0949 }
0950 
0951 /**
0952  * Perform control operations over the shared memory / queue maps.
0953  * !!! THIS SHALL BE USED BY NDRXD ONLY FOR SERVICE QUEUES !!!
0954  * @param qstr conditional queue string, depending from which side we do
0955  *  lookup. Either \p qstr or \p qid must be present
0956  * @param qid queue ID, conditional, if not set then EXFAIL.
0957  * @param cmd currently only IPC_RMID operation is supported. It is expected
0958  *  that IPC_RMID will be called only by NDRXD when it expects that queue
0959  *  needs to be unlinked. But at which point? 
0960  *  Probably this way:
0961  *      1) ndrxd locks the svqshm, svc shm, br shm
0962  *      a)   Note that during this time some server is booting and might 
0963  *      b)   have grabbed the queued id, thus also increased the counter
0964  *      2) ndrxd scans the service lists. Find that particular qid is not used
0965  *      3) then it shall be removed from svqshm, but there is problem,
0966  *          as the a) have created the Q, and not yet reported to NDRXD,
0967  *          the operation shall be delayed. Thus we need a last creation
0968  *          timestamp. So that ndrxd removes un-used queues only after some
0969  *          time of not "created".
0970  *          for this scenario we will pass "arg1" which will give number of
0971  *          seconds to be exceeded for unlink.
0972  *          if unlink is needed immediately, then use -1.
0973  * @param[in] p_deletecb callback to delete event function. I.e. if not null
0974  *  it is called when attempt to delete the queue is performed, so that
0975  *  it would be possible to flush some messages.
0976  * @return EXFAIL, or Op related value. For example for IPC_RMID it will
0977  *  return number of instances left for the queue servers.
0978  */
0979 expublic int ndrx_svqshm_ctl(char *qstr, int qid, int cmd, int arg1,
0980         int (*p_deletecb)(int qid, char *qstr))
0981 {
0982     int ret = EXSUCCEED;
0983     
0984     int delta;
0985     
0986     int found;
0987     int have_value;
0988     int pos;
0989     
0990     int found_2;
0991     int have_value_2;
0992     int pos_2;
0993     
0994     int err = 0;
0995     int is_locked = EXFALSE;
0996     
0997     ndrx_svq_map_t *svq;
0998     ndrx_svq_map_t *svq2;
0999   
1000     
1001     ndrx_svq_map_t *pm;      /* Posix map           */
1002     ndrx_svq_map_t *sm;      /* System V map        */
1003     
1004     INIT_ENTRY(ret);
1005   
1006     svq = (ndrx_svq_map_t *) M_map_p2s.mem;
1007     svq2 = (ndrx_svq_map_t *) M_map_s2p.mem;
1008     
1009     /* ###################### CRITICAL SECTION ############################### */
1010     if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE))
1011     {
1012         goto out;
1013     }
1014     
1015     is_locked = EXTRUE;
1016     
1017     if (qstr)
1018     {
1019         found = position_get_qstr(qstr, 0, &pos, &have_value);
1020         
1021         if (have_value)
1022         {
1023             pm = NDRX_SVQ_INDEX(svq, pos);
1024             
1025             found_2 = position_get_qid(pm->qid, 0, 
1026                     &pos_2, &have_value_2);
1027             
1028             if (!have_value_2)
1029             {
1030                 NDRX_LOG(log_error, "qstr [%s] map not in sync with qid %d "
1031                         "map (have_value_2 is false)",
1032                         qstr, pm->qid);
1033                 userlog("qstr [%s] map not in sync with qid %d "
1034                         "map (have_value_2 is false)",
1035                         qstr, pm->qid);
1036                 err = EBADF;
1037                 EXFAIL_OUT(ret);
1038             }
1039             
1040             sm = NDRX_SVQ_INDEX(svq2, pos_2);
1041         }
1042         
1043     }
1044     else if (EXFAIL!=qid)
1045     {
1046         found_2 = position_get_qid(qid, 0, 
1047                     &pos_2, &have_value_2);
1048         
1049         if (have_value_2)
1050         {
1051             sm = NDRX_SVQ_INDEX(svq2, pos_2);
1052             
1053             found = position_get_qstr(sm->qstr, 0, &pos, &have_value);
1054             
1055             if (!have_value)
1056             {
1057                 NDRX_LOG(log_error, "qstr [%s] map not in sync with qid %d "
1058                         "map (have_value is false)",
1059                         sm->qstr, qstr, qid);
1060                 
1061                 userlog("qstr [%s] map not in sync with qid %d "
1062                         "map (have_value is false)",
1063                         sm->qstr, qstr, qid);
1064                 err = EBADF;
1065                 EXFAIL_OUT(ret);
1066             }
1067             
1068             pm = NDRX_SVQ_INDEX(svq, pos);
1069             
1070         }
1071     }
1072     else
1073     {
1074         NDRX_LOG(log_error, "qstr and qid are invalid => FAIL");
1075         err = EINVAL;
1076         EXFAIL_OUT(ret);
1077     }
1078         
1079     switch (cmd)
1080     {
1081         /* set the flags for the queue */
1082         case IPC_SET:
1083             
1084             NDRX_LOG(log_debug, "Adding flags %hd to qstr [%s]/qid %d",
1085                     (short)arg1, pm->qstr, pm->qid);
1086             pm->flags |= (short)arg1;
1087             sm->flags |= (short)arg1;
1088             
1089             if (NDRX_SVQ_MAP_RQADDR & arg1)
1090             {
1091                 /* set change time, as for housekeeping we need to check
1092                  * all running servers to match the cnt service shm,
1093                  * and do this in case if TTL is expired (i.e. all server infos
1094                  * are processed and system is stable)
1095                  */
1096                 ndrx_stopwatch_reset(&(pm->ctime));
1097                 sm->ctime = pm->ctime;
1098             }
1099             
1100             break;
1101         case IPC_RMID:
1102             
1103             if (!have_value)
1104             {
1105                 NDRX_LOG(log_info, "Queue not found [%s]/%d", 
1106                         qstr?qstr:"NULL", qid);
1107                 err = ENOENT;
1108                 EXFAIL_OUT(ret);
1109             }
1110             
1111             if (EXFAIL!=arg1)
1112             {
1113                 delta = ndrx_stopwatch_get_delta_sec( &(pm->ctime));
1114             }
1115             
1116             if ( EXFAIL==arg1 || delta > arg1)
1117             {
1118                 NDRX_LOG(log_info, "Unlinking queue: [%s]/%d (delta: %d, limit: %d)",
1119                         pm->qstr, pm->qid, delta, arg1);
1120                 
1121                 /* unlink the q and remove entries from shm */
1122                 
1123                 if (NULL!=p_deletecb && EXSUCCEED!=p_deletecb(pm->qid, pm->qstr))
1124                 {
1125                     NDRX_LOG(log_error, "Delete callback failed for [%s]/%d",
1126                             pm->qstr, pm->qid);
1127                     EXFAIL_OUT(ret);
1128                 }
1129                 
1130                 if (EXSUCCEED!=msgctl(pm->qid, IPC_RMID, NULL))
1131                 {
1132                     err = errno;
1133                     
1134                     if (EIDRM!=err && EINVAL!=err)
1135                     {
1136                         NDRX_LOG(log_error, "got error when removing %d: %s", 
1137                             pm->qid, strerror(err));
1138                         userlog("got error when removing %d: %s", 
1139                             pm->qid, strerror(err));
1140                         EXFAIL_OUT(ret);
1141                     }
1142                     else
1143                     {
1144                         NDRX_LOG(log_warn, "got error when removing %d: %s - ignore", 
1145                             pm->qid, strerror(err));
1146                     }
1147                 }
1148                     
1149                 NDRX_LOG(log_debug, "Removing ISUSED flag for P2S/S2P mem");
1150 
1151                 pm->flags &= ~(NDRX_SVQ_MAP_ISUSED);
1152                 sm->flags &= ~(NDRX_SVQ_MAP_ISUSED);
1153             }
1154             
1155             break;
1156         
1157         default:
1158             
1159             NDRX_LOG(log_error, "Unsupported command: %d", cmd);
1160             err=EINVAL;
1161             EXFAIL_OUT(ret);
1162             break;
1163     }
1164     
1165     
1166 out:
1167     
1168     if (is_locked)
1169     {
1170         ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_WRITE);
1171         /* ###################### CRITICAL SECTION, END ########################## */
1172     }
1173     
1174     errno = err;
1175     return ret;
1176 }
1177 
1178 
1179 /**
1180  * List queue open in the System V IPC.
1181  * So what we do here basically is get read lock SystemV SHM tables
1182  * and return the results.
1183  * @param qpath queue path, not used actually
1184  * @param[out] return_status EXSUCCEED/EXFAIL
1185  * @return NULL no queues found (or error) or ptr to queues
1186  */
1187 expublic string_list_t* ndrx_sys_mqueue_list_make_svq(char *qpath, int *return_status)
1188 {
1189     int retstat = EXSUCCEED;
1190     string_list_t* ret = NULL;
1191     int have_lock = EXFALSE;
1192     int i=0;
1193     ndrx_svq_map_t *svq;
1194     ndrx_svq_map_t *pm;      /* Posix map           */
1195     
1196     INIT_ENTRY(retstat);
1197     
1198     svq = (ndrx_svq_map_t *) M_map_p2s.mem;
1199     
1200     *return_status = EXSUCCEED;
1201             
1202     /* ###################### CRITICAL SECTION ############################### */
1203     if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
1204     {
1205         goto out;
1206     }
1207     
1208     have_lock = EXTRUE;
1209     
1210     for (i=0;i<ndrx_G_libnstd_cfg.queuesmax;i++)
1211     {
1212         pm = NDRX_SVQ_INDEX(svq, i);
1213         
1214         if (pm->flags & NDRX_SVQ_MAP_ISUSED)
1215         {
1216             if (EXSUCCEED!=ndrx_string_list_add(&ret, pm->qstr))
1217             {
1218                 NDRX_LOG(log_error, "failed to add string to list [%s]", 
1219                         pm->qstr);
1220                 *return_status = EXFAIL;
1221                 goto out;
1222             }
1223         }
1224     }
1225     
1226 out:
1227     if (have_lock)
1228     {
1229         ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
1230         /* ###################### CRITICAL SECTION, END ########################## */
1231     }
1232     
1233     return ret;
1234 }
1235 
1236 /**
1237  * Remove queue status in newly allocated memory block
1238  * @param[out] len number elements in allocated block
1239  * @param[in] number of seconds to leave record in map until removal
1240  * @return ptr to alloc block or NULL in case of error
1241  */
1242 expublic ndrx_svq_status_t* ndrx_svqshm_statusget(int *len, int ttl)
1243 {
1244     int ret = EXSUCCEED;
1245     ndrx_svq_status_t* block = NULL;
1246     int sz = sizeof(ndrx_svq_status_t) * ndrx_G_libnstd_cfg.queuesmax;
1247     int err;
1248     int have_lock = EXFALSE;
1249     int i=0;
1250     ndrx_svq_map_t *svq = (ndrx_svq_map_t *) M_map_s2p.mem;
1251     ndrx_svq_map_t *pm;      /* Posix map           */
1252     
1253     block = NDRX_MALLOC(sz);
1254     
1255     if (NULL==block)
1256     {
1257         err = errno;
1258         NDRX_LOG(log_error, "Failed to malloc %d bytes: %s", sz, strerror(err));
1259         userlog("Failed to malloc %d bytes: %s", sz, strerror(err));
1260         EXFAIL_OUT(ret);
1261     }
1262     
1263     *len = ndrx_G_libnstd_cfg.queuesmax;
1264     
1265     /* ###################### CRITICAL SECTION ############################### */
1266     if (EXSUCCEED!=ndrx_sem_rwlock(&M_map_sem, 0, NDRX_SEM_TYP_READ))
1267     {
1268         EXFAIL_OUT(ret);
1269     }
1270     
1271     have_lock = EXTRUE;
1272     
1273     for (i=0;i<ndrx_G_libnstd_cfg.queuesmax;i++)
1274     {
1275         pm = NDRX_SVQ_INDEX(svq, i);
1276         block[i].flags = pm->flags;
1277         block[i].qid = pm->qid;
1278      
1279         if (block[i].flags & NDRX_SVQ_MAP_ISUSED &&
1280                 ndrx_stopwatch_get_delta_sec( &(pm->ctime)) > ttl)
1281         {
1282             block[i].flags |= NDRX_SVQ_MAP_SCHEDRM;
1283         }
1284         
1285         NDRX_STRCPY_SAFE(block[i].qstr, pm->qstr);
1286     }
1287     
1288 out:
1289             
1290     if (have_lock)
1291     {
1292         ndrx_sem_rwunlock(&M_map_sem, 0, NDRX_SEM_TYP_READ);
1293         /* ###################### CRITICAL SECTION, END ########################## */
1294     }
1295 
1296     return block;
1297 }
1298 
1299 /* vim: set ts=4 sw=4 et smartindent: */