Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Enduro/X System V message queue support
0003  *   Also in case of system exit, we will use "atexit(3)" call to register
0004  *   termination handlers.
0005  *
0006  * @file sys_svq.h
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 
0037 #ifndef SYS_SYSVQ_H__
0038 #define SYS_SYSVQ_H__
0039 
0040 /*------------------------------Includes--------------------------------------*/
0041 #include <pthread.h>
0042 #include <unistd.h>
0043 #include <sys/ipc.h>
0044 #include <sys/signal.h>
0045 #include <time.h>
0046 #include <sys_primitives.h>
0047 #include <atmi.h>
0048 #include <nstopwatch.h>
0049 #include <nstd_shm.h>
0050 
0051 /*------------------------------Externs---------------------------------------*/
0052 /*------------------------------Macros----------------------------------------*/
0053 
0054 #define NDRX_SVQ_EV_NONE    0   /**< No event received, just normal msg */
0055 #define NDRX_SVQ_EV_TOUT    1   /**< Timeout event                      */
0056 #define NDRX_SVQ_EV_DATA    2   /**< Main thread got some data          */
0057 #define NDRX_SVQ_EV_FD      3   /**< File descriptor got something      */
0058 
0059 #define NDRX_SVQ_SIG SIGUSR2    /**< Signal used for timeout wakeups    */
0060 
0061 /** Queue is used              */
0062 #define NDRX_SVQ_MAP_ISUSED       NDRX_LH_FLAG_ISUSED
0063 /** Queue was used (or is used)*/
0064 #define NDRX_SVQ_MAP_WASUSED      NDRX_LH_FLAG_WASUSED
0065 #define NDRX_SVQ_MAP_EXPIRED      0x0004  /**< Queue expired by ctime     */
0066 #define NDRX_SVQ_MAP_SCHEDRM      0x0008  /**< Schedule queue removal     */
0067 #define NDRX_SVQ_MAP_RQADDR       0x0010  /**< This is request address q  */
0068 #define NDRX_SVQ_MAP_HAVESVC      0x0020  /**< Have service in shm        */
0069 
0070 
0071 #define NDRX_SVQ_MONF_SYNCFD      0x00000001  /**< Perform monitoring on FDs  */
0072 
0073 /** For quick access to  */
0074 #define NDRX_SVQ_INDEX(MEM, IDX) ((ndrx_svq_map_t*)(((char*)MEM)+(int)(sizeof(ndrx_svq_map_t)*IDX)))
0075 #define NDRX_SVQ_STATIDX(MEM, IDX) ((ndrx_svq_status_t*)(((char*)MEM)+(int)(sizeof(ndrx_svq_status_t)*IDX)))
0076 
0077 /** Match timeout event */
0078 #define NDRX_SVQ_TOUT_MATCH(X, Y) (X->stamp_seq == Y->stamp_seq && \
0079                         0==memcmp( &(X->stamp_time), &(Y->stamp_time), \
0080                         sizeof(X->stamp_time)))
0081 
0082 #define NDRX_SVQ_MON_TOUT         1 /**< Request for timeout                  */
0083 #define NDRX_SVQ_MON_ADDFD        2 /**< Add file descriptor for ev monitoring*/
0084 #define NDRX_SVQ_MON_RMFD         3 /**< Remove file descriptor for ev mon    */
0085 #define NDRX_SVQ_MON_TERM         4 /**< Termination handler calls us         */
0086 
0087 
0088 /**#define NDRX_SVQ_MON_CLOSE        5 < Queue unlink request, not used anymore */
0089 
0090 #define NDRX_SVQ_INLEN(X)       (X-sizeof(long))    /**< System V input len   */
0091 #define NDRX_SVQ_OUTLEN(X)       (X+sizeof(long))   /**< System V output len  */
0092 
0093 /*------------------------------Enums-----------------------------------------*/
0094 /*------------------------------Typedefs--------------------------------------*/
0095 
0096 /**
0097  * Shared memory entry for service
0098  */
0099 typedef struct ndrx_svq_map ndrx_svq_map_t;
0100 struct ndrx_svq_map
0101 {
0102     char qstr[NDRX_MAX_Q_SIZE+1];       /**< Posix queue name string    */
0103     int qid;                            /**< System V Queue id          */
0104     short flags;                        /**< See NDRX_SVQ_MAP_STAT_*    */
0105     /* put stopwatch for last create time */
0106     ndrx_stopwatch_t ctime;             /**< change time                */
0107 #ifdef EX_ALIGNMENT_FORCE
0108     long padding1;                      /**< ensure that struct is padded*/
0109 #endif
0110 };
0111 
0112 /**
0113  * List of expired queues (it doesn't mean they will be removed,
0114  * that is later confirmed by ndrxd when scanning services)
0115  */
0116 typedef struct
0117 {
0118     int qid;                            /**< System V Queue id          */
0119     short flags;                        /**< See NDRX_SVQ_MAP_STAT_*    */
0120     char qstr[NDRX_MAX_Q_SIZE+1];       /**< Posix queue name string    */
0121     ndrx_stopwatch_t ctime;             /**< change time                */
0122 } ndrx_svq_status_t;
0123 
0124 /**
0125  * Message queue attributes
0126  */
0127 struct mq_attr 
0128 {
0129     long mq_flags;
0130     long mq_maxmsg;
0131     long mq_msgsize;
0132     long mq_curmsgs;
0133 };
0134 
0135 typedef struct ndrx_svq_ev ndrx_svq_ev_t;
0136 /**
0137  * Event queue, either timeout, data or waken up by poller
0138  */
0139 struct ndrx_svq_ev
0140 {
0141     int ev;                 /**< Event code received                        */
0142     
0143     ndrx_stopwatch_t stamp_time; /**< timestamp for timeout waiting         */
0144     unsigned long stamp_seq;/**< stamp sequence                             */
0145     
0146     int fd;                 /**< Linked file descriptor generating FD event */
0147     uint32_t revents;       /**< events occurred                            */
0148     
0149     size_t datalen;         /**< data event len, by admin thread            */
0150     char *data;             /**< event data                                 */
0151     
0152     ndrx_svq_ev_t *next, *prev;/**< Linked list of event enqueued           */
0153 };
0154 
0155 /**
0156  * Queue entry
0157  */
0158 struct ndrx_svq_info
0159 {
0160     char qstr[NDRX_MAX_Q_SIZE+1];/**< Posix queue name string               */
0161     int qid;                    /**< System V Queue ID                      */
0162     
0163     mode_t mode;                /**< permissions on new queue               */
0164     struct mq_attr attr;        /**< attributes for the queue, if creating  */
0165     
0166 #ifdef EX_USE_SYSVQ
0167     
0168     /* Locks for synchronous or other event wakeup */
0169     
0170     /* Using spinlocks for better performance  */
0171     NDRX_SPIN_LOCKDECL (rcvlock);    /**< Data receive lock, msgrcv              */
0172     NDRX_SPIN_LOCKDECL (rcvlockb4);  /**< Data receive lock, before going msgrcv */
0173     ndrx_svq_ev_t *eventq;      /**< Events queued for this ipc q              */
0174     pthread_mutex_t barrier;     /**< Border lock after msgrcv woken up        */
0175     pthread_mutex_t qlock;      /**< Queue lock (event queue)                  */
0176 
0177     /* Timeout matching.
0178      * All the timeout events are enqueued to thread and thread is waken up
0179      * if needed. If not then event will be discarded because of stamps
0180      * does not match.
0181      */
0182     NDRX_SPIN_LOCKDECL (stamplock);/**< Stamp change lock                     */
0183     ndrx_stopwatch_t stamp_time;/**< timestamp for timeout waiting            */
0184     volatile unsigned long stamp_seq;    /**< stamp sequence                  */
0185     
0186     /**
0187      * thread operating with queue... 
0188      * Also note that one thread might operate with multiple queues.
0189      * but only one queue will be blocked at the same time.
0190      * WARNING !!! This needs to be set every time we enter in wait mode...
0191      * cannot be set initially because threads might be switched
0192      * in high level, Object API modes.
0193      */
0194     pthread_t thread;
0195 #endif
0196     
0197     char *self;                /**< ptr to self for hash                    */
0198     EX_hash_handle hh;         /**< delete hash                             */
0199 
0200 };
0201 typedef struct ndrx_svq_info * mqd_t;
0202 
0203 
0204 /**
0205  * Command block for monitoring thread
0206  */
0207 typedef struct
0208 {
0209     int cmd;                    /**< See NDRX_SVQ_MON_* commands            */
0210     struct timespec abs_timeout;/**< timeout value when the wait shell tout */
0211     int flags;                  /**< See NDRX_SVQ_MONF*                     */
0212     
0213     /* Data for timeout request: */
0214     ndrx_stopwatch_t stamp_time;/**< timestamp for timeout waiting          */
0215     unsigned long stamp_seq;    /**< stamp sequence                         */
0216     
0217     int fd;                     /** file descriptor for related cmds        */
0218     uint32_t events;            /** events requested for fd monitor         */
0219     mqd_t mqd;                  /** message queue requesting an event       */
0220     
0221     /* in case of mqd delete, we shall sync off with the back thread */
0222     pthread_mutex_t *del_lock;   /** delete lock                             */
0223     pthread_cond_t *del_cond;     /** conditional variable for delete         */
0224     
0225 } ndrx_svq_mon_cmd_t;
0226 
0227 /*------------------------------Globals---------------------------------------*/
0228 /*------------------------------Statics---------------------------------------*/
0229 /*------------------------------Prototypes------------------------------------*/
0230 
0231 extern NDRX_API int ndrx_svq_close(mqd_t mqd);
0232 extern NDRX_API int ndrx_svq_getattr(mqd_t, struct mq_attr * attr);
0233 extern NDRX_API int ndrx_svq_notify(mqd_t, const struct sigevent *);
0234 extern NDRX_API mqd_t ndrx_svq_open(const char *pathname, int oflag, mode_t mode, 
0235                 struct mq_attr *attr);
0236 extern NDRX_API ssize_t ndrx_svq_receive(mqd_t, char *, size_t, unsigned int *);
0237 extern NDRX_API int ndrx_svq_send(mqd_t, const char *, size_t, unsigned int);
0238 extern NDRX_API int ndrx_svq_setattr(mqd_t, const struct mq_attr *attr, struct mq_attr *oattr);
0239 extern NDRX_API int ndrx_svq_unlink(const char *name);
0240 
0241 extern NDRX_API  int ndrx_svq_timedsend(mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
0242         const struct timespec *__abs_timeout); 
0243 
0244 extern NDRX_API ssize_t ndrx_svq_timedreceive(mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
0245         const struct timespec * __abs_timeout);
0246 
0247 extern NDRX_API void ndrx_svq_set_lock_timeout(int secs);
0248 extern NDRX_API int ndrx_svq_mqd_put_event(mqd_t mqd, ndrx_svq_ev_t **ev);
0249 extern NDRX_API void ndrx_svq_delref_add(mqd_t qd);
0250 extern NDRX_API int ndrx_svq_event_sndrcv(mqd_t mqd, char *ptr, ssize_t *maxlen, 
0251         struct timespec *abs_timeout, ndrx_svq_ev_t **ev, int is_send, int syncfd);
0252 extern NDRX_API void ndrx_svq_event_exit(int detatch);
0253 
0254 extern NDRX_API void ndrx_svq_fork_prepare(void);
0255 extern NDRX_API void ndrx_svqadmin_fork_prepare(void);
0256 
0257 
0258 /* Direct API for setting timeout values... so that we register time-out 
0259  by worker thread. And the event thread will pick up any existing value there
0260  if timed out
0261  */
0262 extern NDRX_API int ndrx_svq_fd_nrof(void);
0263 extern NDRX_API void ndrx_svq_mqd_hash_del(mqd_t mqd);
0264 extern NDRX_API int ndrx_svq_mqd_close(mqd_t mqd);
0265 extern NDRX_API int ndrx_svq_mqd_hash_add(mqd_t mqd, ndrx_stopwatch_t *stamp_time, 
0266         unsigned long stamp_seq, struct timespec *abs_timeout);
0267 
0268 /* end of Direct API */
0269 
0270 extern NDRX_API int ndrx_svq_moncmd_term(void);
0271 /* extern NDRX_API int ndrx_svq_moncmd_close(mqd_t mqd); */
0272 extern NDRX_API int ndrx_svq_moncmd_addfd(mqd_t mqd, int fdm, uint32_t events);
0273 extern NDRX_API int ndrx_svq_moncmd_rmfd(int fd);
0274 extern NDRX_API mqd_t ndrx_svq_mainq_get(void);
0275 
0276 extern NDRX_API int ndrx_svq_event_init(void);
0277 extern NDRX_API int ndrx_svq_scanunit_set(int ms);
0278 
0279 /* internals... */
0280 extern NDRX_API int ndrx_svqshm_init(int attach_only);
0281 extern NDRX_API int ndrx_svqshm_attach(void);
0282 extern NDRX_API int ndrx_svqshm_down(int force);
0283 extern NDRX_API void ndrx_svqshm_detach(void);
0284 extern NDRX_API int ndrx_svqshm_shmres_get(ndrx_shm_t **map_p2s, ndrx_shm_t **map_s2p, 
0285         ndrx_sem_t **map_sem, int *queuesmax);
0286 extern NDRX_API int ndrx_svqshm_get(char *qstr, mode_t mode, int oflag);
0287 extern NDRX_API int ndrx_svqshm_get_qid(int in_qid, char *out_qstr, int out_qstr_len);
0288 extern NDRX_API int ndrx_svqshm_ctl(char *qstr, int qid, int cmd, int arg1,
0289         int (*p_deletecb)(int qid, char *qstr));
0290 
0291 extern NDRX_API ndrx_svq_status_t* ndrx_svqshm_statusget(int *len, int ttl);
0292 
0293 extern NDRX_API string_list_t* ndrx_sys_mqueue_list_make_svq(char *qpath, int *return_status);
0294 
0295 extern NDRX_API int ndrx_svqshm_get_status(ndrx_svq_status_t *status, 
0296         int qid, int *pos, int *have_value);
0297 
0298 extern NDRX_API int ndrx_svqadmin_init(mqd_t adminq);
0299 extern NDRX_API int ndrx_svqadmin_deinit(void);
0300 
0301 #endif
0302 
0303 /* vim: set ts=4 sw=4 et smartindent: */