Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Utility functions for ATMI (generic send, etc...)
0003  *   We might want to monitor with `stat' the disk inode of the queue
0004  *   and we could cache the opened queues, so that we do not open them
0005  *   every time.
0006  *
0007  * @file atmiutils.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <ndrx_config.h>
0038 #include <string.h>
0039 #include <sys/time.h>
0040 #include <stdio.h>
0041 #include <stdlib.h>
0042 #include <memory.h>
0043 #include <sys_mqueue.h>
0044 #include <errno.h>
0045 #include <sys/param.h>
0046 #include <unistd.h>
0047 #include <fcntl.h>
0048 
0049 #include <sys_unix.h>
0050 #include <atmi.h>
0051 #include <ndrstandard.h>
0052 #include <ndebug.h>
0053 #include <atmi_int.h>
0054 #include <ndrxdcmn.h>
0055 #include <utlist.h>
0056 #include <atmi_shm.h>
0057 #include <tperror.h>
0058 
0059 #include "gencall.h"
0060 #include "userlog.h"
0061 #include "atmi_tls.h"
0062 /*---------------------------Externs------------------------------------*/
0063 /*---------------------------Macros-------------------------------------*/
0064 #define SLEEP_ON_FULL_Q             170000   /* Sleep 150 ms every batch..... */
0065 
0066 
0067 /* set the timeout source */
0068 #define TOUT_SOURCE         do\
0069         {\
0070             if (tout>0)\
0071             {\
0072                 tout_act=tout;\
0073             }\
0074             else if (tout_restart >0)\
0075             {\
0076                 tout_act=tout_restart;\
0077             }\
0078             else if (G_atmi_tls && G_atmi_tls->tout_next_eff > 0)\
0079             {\
0080                 tout_act=G_atmi_tls->tout_next_eff;\
0081             }\
0082             else if (G_atmi_tls && G_atmi_tls->tout > 0)\
0083             {\
0084                 tout_act=G_atmi_tls->tout;\
0085             }\
0086             else\
0087             {\
0088                 tout_act=G_atmi_env.time_out;\
0089             }\
0090             tout_restart = tout_act;\
0091         } while (0)
0092 
0093 /* Calculate timeout time */
0094 #define SET_TOUT_VALUE     do\
0095         {\
0096             if (use_tout)\
0097             {\
0098                 struct timeval  timeval;\
0099                 use_tout=1;\
0100                 gettimeofday (&timeval, NULL);\
0101                 TOUT_SOURCE;\
0102                 abs_timeout.tv_sec = timeval.tv_sec+tout_act;\
0103                 abs_timeout.tv_nsec = timeval.tv_usec*1000;\
0104             }\
0105         } while (0)
0106 
0107 /* Configure function to use TOUT */
0108 #define SET_TOUT_CONF     do\
0109         {\
0110             if (G_atmi_env.time_out==0 || flags & TPNOTIME || flags & TPNOBLOCK)\
0111             {\
0112                 use_tout=0;\
0113             }\
0114             else \
0115             {\
0116                 use_tout=1;\
0117             }\
0118         } while (0)
0119 
0120 /* This prints info about q descriptor X */
0121 #define PRINT_Q_INFO(X)             { struct mq_attr __attr;\
0122             memset((char *)&__attr, 0, sizeof(__attr));\
0123             ndrx_mq_getattr(X, &__attr);\
0124             NDRX_LOG(log_error, "mq_flags=%ld mq_maxmsg=%ld mq_msgsize=%ld mq_curmsgs=%ld",\
0125                     __attr.mq_flags, __attr.mq_maxmsg, __attr.mq_msgsize, __attr.mq_curmsgs);}
0126 
0127 /**
0128  * On freebsd we get <32 prios only.
0129  * Also note that we assume here, that PRIO would be within range of 100.
0130  */
0131 #ifdef MQ_PRIO_MAX
0132 
0133 #if (MQ_PRIO_MAX-1 < NDRX_MSGPRIO_MAX)
0134 
0135     #define NDRX_PRIO_DOWNSCALE(PRIO)\
0136     {\
0137         PRIO=( ((float)PRIO) * (float)(MQ_PRIO_MAX-1) / (float)NDRX_MSGPRIO_MAX );\
0138     }
0139 
0140 #else
0141 /* no scaling needed ... */
0142 #define NDRX_PRIO_DOWNSCALE(PRIO)
0143 #endif
0144 
0145 #else
0146 
0147 /* no scaling... as no priority supported by queuing sub-system */
0148 #define NDRX_PRIO_DOWNSCALE(PRIO)
0149 
0150 #endif
0151 /*---------------------------Enums--------------------------------------*/
0152 /*---------------------------Typedefs-----------------------------------*/
0153 /*---------------------------Globals------------------------------------*/
0154 /*---------------------------Statics------------------------------------*/
0155 /*---------------------------Prototypes---------------------------------*/
0156 
0157 
0158 /**
0159  * Configure/override `NDRX_TOUT'
0160  * The env must be loaded to use this function...
0161  * @param[in] tout number of seconds for timeout call/wait timeout
0162  */
0163 expublic void ndrx_tptoutset(int tout)
0164 {
0165     NDRX_LOG(log_info, "%s: NDRX_TOUT override from %d to %d seconds", 
0166             __func__, G_atmi_env.time_out, tout);
0167     G_atmi_env.time_out = tout;
0168 }
0169 
0170 /**
0171  * Get the ATMI timeout configuration
0172  * @return number of seconds for timeout
0173  */
0174 expublic int ndrx_tptoutget(void)
0175 {
0176     return G_atmi_env.time_out;
0177 }
0178 
0179 /**
0180  * Get current timeout setting, based on TLS data
0181  * @return actual timeout setting currently effective
0182  */
0183 expublic int ndrx_tptoutget_eff(void)
0184 {
0185     int use_tout=1, tout_act=0, tout_restart=EXFAIL, tout=0;
0186     
0187     TOUT_SOURCE;
0188     
0189     return tout_act;
0190 }
0191  
0192 /**
0193  * When tons of messages are sent to xadmin, then we might gets some sleep,
0194  * so that console is ready to display complete stuff..!
0195  * 
0196  * NOTE: THIS IS NOT VERY CLEARN SOLUTION! But what to do... xadmin is not very critical part anywya!
0197  * 
0198  * Or... might switch to blocked concept... with some time of 2 secs, after which
0199  * if expired, assume that xadmin is dead.
0200  * 
0201  * Might think in future increase msg_max size...!
0202  * @return 
0203  */
0204 expublic void ndrx_mq_fix_mass_send(int *cntr)
0205 {
0206     *cntr = *cntr + 1;
0207     /* Have some backup */
0208     if (*cntr >= G_atmi_env.msg_max - Q_SEND_GRACE)
0209     {
0210         NDRX_LOG(log_error, "About to sleep! %d %d", *cntr, G_atmi_env.msg_max);
0211         usleep(SLEEP_ON_FULL_Q);
0212         *cntr = 0;
0213     }
0214 }
0215 
0216 /**
0217  * Change attributes of the queue
0218  * @param q_descr
0219  * @return 
0220  */
0221 expublic int ndrx_q_setblock(mqd_t q_descr, int blocked)
0222 {
0223     int ret=EXSUCCEED;
0224     struct mq_attr new;
0225     struct mq_attr old;
0226     int change = EXFALSE;
0227     
0228     if (EXSUCCEED!= ndrx_mq_getattr(q_descr, &old))
0229     {
0230         NDRX_LOG(log_warn, "Failed to get attribs of Q: %d, err: %s", 
0231                 q_descr, strerror(errno));
0232         ret=EXFAIL;
0233         goto out;
0234     }
0235     
0236     /*non blocked requested & but currently Q is blocked => change attribs*/
0237     if (!blocked && !(old.mq_flags & O_NONBLOCK))
0238     {
0239         memcpy(&new, &old, sizeof(new));
0240         NDRX_LOG(log_warn, "Switching qd %d to non-blocked", q_descr);
0241         new.mq_flags |= O_NONBLOCK;
0242         change = EXTRUE;
0243     }
0244     /*blocked requested & but currently Q is not blocked => change attribs*/
0245     else if (blocked && old.mq_flags & O_NONBLOCK)
0246     {
0247         memcpy(&new, &old, sizeof(new));
0248         NDRX_LOG(log_warn, "Switching qd %d to blocked", q_descr);
0249         new.mq_flags &= ~O_NONBLOCK; /* remove non block flag */
0250         change = EXTRUE;
0251     }
0252     
0253     if (change)
0254     {
0255         if (EXFAIL==ndrx_mq_setattr(q_descr, &new,
0256                             &old))
0257         {
0258             NDRX_LOG(log_error, "Failed to set attribs for qd %d: %s", 
0259                     q_descr, strerror(errno));
0260             ret=EXFAIL;
0261             goto out;
0262         }
0263     }
0264     
0265 out:
0266     return ret;
0267 }
0268 
0269 /**
0270  * Open queue with attributes
0271  * @param name
0272  * @param oflag
0273  * @param attr
0274  * @return
0275  */
0276 expublic mqd_t ndrx_mq_open_at(char *name, int oflag, mode_t mode, 
0277         struct mq_attr *attr)
0278 {
0279     struct mq_attr attr_int;
0280     struct mq_attr * p_at;
0281     mqd_t ret;
0282     int errno_save;
0283 
0284     if (NULL==attr)
0285     {
0286         memset(&attr_int, 0, sizeof(attr_int));
0287         p_at = &attr_int;
0288     }
0289     else
0290     {
0291         p_at = attr;
0292     }
0293 
0294     if (!p_at->mq_maxmsg)
0295         p_at->mq_maxmsg = G_atmi_env.msg_max;
0296 
0297     if (!p_at->mq_msgsize)
0298         p_at->mq_msgsize = G_atmi_env.msgsize_max;
0299 
0300     ret=ndrx_mq_open(name, oflag, mode, p_at);
0301     errno_save=errno;
0302     
0303     NDRX_LOG(6, "ndrx_mq_open_at(name=%s) returns 0x%lx (mq_maxmsg: %d mq_msgsize: %d)",
0304     name, (long int) ret, p_at->mq_maxmsg, p_at->mq_msgsize);
0305     errno=errno_save;
0306     return ret;
0307 }
0308 
0309 /**
0310  * Generic queue open
0311  * @param name
0312  * @param oflag
0313  * @return
0314  */
0315 expublic mqd_t ndrx_mq_open_at_wrp(char *name, int oflag)
0316 {
0317     
0318     return ndrx_mq_open_at(name, oflag, 0, NULL);
0319 }
0320 
0321 /**
0322  * Will format the queue and do the call. Also does reports back errors (with
0323  * details).
0324  * This will be blocked send.
0325  * 
0326  * TODO: Added support for TPNOBLOCK!!!
0327  * @param svc
0328  * @param data
0329  * @param len
0330  * @return Unix error code
0331  */
0332 expublic int ndrx_generic_qfd_send(mqd_t q_descr, char *data, long len, long flags)
0333 {
0334     int ret=EXSUCCEED;
0335     int use_tout, tout_act=0, tout_restart=EXFAIL, tout=0;
0336     struct timespec abs_timeout;
0337     int snd_prio;
0338     SET_TOUT_CONF;
0339     
0340     snd_prio = NDRX_MSGPRIO_DEFAULT;
0341     
0342     NDRX_PRIO_DOWNSCALE(snd_prio);
0343     
0344 restart:
0345 
0346     SET_TOUT_VALUE;
0347 
0348     /* Internal message including tpsend() will use default priority. */
0349     if ((!use_tout && EXFAIL==ndrx_mq_send(q_descr, data, len, snd_prio)) ||
0350          (use_tout && EXFAIL==ndrx_mq_timedsend(q_descr, data, len, snd_prio, &abs_timeout)))
0351     {
0352         if (EINTR==errno && flags & TPSIGRSTRT)
0353         {
0354             NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_send");
0355             goto restart;
0356         }
0357         else if (EAGAIN==errno)
0358         {
0359             PRINT_Q_INFO(q_descr);
0360         }
0361         ret=errno;
0362         NDRX_LOG(log_error, "Failed to send data to fd [%d] with error: %s",
0363                                         q_descr, strerror(ret));
0364     }
0365 
0366 out:
0367     return ret;
0368 }
0369 
0370 /**
0371  * Send data to Queue
0372  * @param queue
0373  * @param data
0374  * @param len
0375  * @param flags
0376  * @return 
0377  */
0378 expublic int ndrx_generic_q_send(char *queue, char *data, long len, long flags, int msg_prio)
0379 {
0380     return ndrx_generic_q_send_2(queue, data, len, flags, EXFAIL, msg_prio);
0381 }
0382 
0383 /**
0384  * Will format the queue and do the call. Also does reports back errors (with
0385  * details).
0386  * This will be blocked send.
0387  * @param svc
0388  * @param data
0389  * @param len
0390  * @param tout - time-out in seconds.
0391  * @param msg_prio - message priority see, NDRX_MSGPRIO_*
0392  * @return SUCCEED/FAIL
0393  */
0394 expublic int ndrx_generic_q_send_2(char *queue, char *data, long len, long flags, 
0395         long tout, int msg_prio)
0396 {
0397     int ret=EXSUCCEED;
0398     mqd_t q_descr=(mqd_t)EXFAIL;
0399     int use_tout, tout_restart=EXFAIL, tout_act=0;
0400     struct timespec abs_timeout;
0401     long add_flags = 0;
0402     int snd_prio;
0403     SET_TOUT_CONF;
0404 
0405     /* Set nonblock flag to system, if provided to EnduroX */
0406     if (flags & TPNOBLOCK)
0407     {
0408         NDRX_LOG(log_debug, "Enabling NONBLOCK send");
0409         add_flags|=O_NONBLOCK;
0410     }
0411 
0412     /* open the queue */
0413     /* Restart until we do not get the signal */
0414 restart_open:
0415     q_descr = ndrx_mq_open_at_wrp (queue, O_WRONLY | add_flags);
0416 
0417     if ((mqd_t)EXFAIL==q_descr && EINTR==errno && flags & TPSIGRSTRT)
0418     {
0419         NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_open");
0420         goto restart_open;
0421     }
0422     
0423     if ((mqd_t)EXFAIL==q_descr)
0424     {
0425         NDRX_LOG(log_error, "Failed to open queue [%s] with error: %s",
0426                                         queue, strerror(errno));
0427         ret=errno;
0428         goto out;
0429     }
0430 
0431     /* now try to send */
0432 restart_send:
0433 
0434     SET_TOUT_VALUE;
0435 
0436     if (0==msg_prio)
0437     {
0438         /* set default prio */
0439         msg_prio = NDRX_MSGPRIO_DEFAULT;
0440     }
0441 
0442     /** override the message priority */
0443     if (NULL!=G_atmi_tls && G_atmi_tls->prio)
0444     {
0445         /* override priority from tpsprio() */
0446         
0447         if (G_atmi_tls->prio_flags & TPABSOLUTE)
0448         {
0449             msg_prio = G_atmi_tls->prio;
0450         }
0451         else
0452         {
0453             /* relative change */
0454             msg_prio += G_atmi_tls->prio;
0455         }
0456     }
0457 
0458     if (msg_prio<NDRX_MSGPRIO_MIN)
0459     {
0460         /* set default prio */
0461         msg_prio = NDRX_MSGPRIO_MIN;
0462     }
0463     else if (msg_prio>NDRX_MSGPRIO_MAX)
0464     {
0465         /* set default prio */
0466         msg_prio = NDRX_MSGPRIO_MAX;
0467     }
0468 
0469     snd_prio=msg_prio;
0470     /* freebsd needs downscale on some version limited to 32 priorities */
0471     NDRX_PRIO_DOWNSCALE(snd_prio);
0472             
0473     NDRX_LOG(log_debug, "len: %d use timeout: %d config: %d prio: %d snd_prio: %d", 
0474                 len, use_tout, tout_act, msg_prio, snd_prio);
0475     if ((!use_tout && EXFAIL==ndrx_mq_send(q_descr, data, len, snd_prio)) ||
0476          (use_tout && EXFAIL==ndrx_mq_timedsend(q_descr, data, len, snd_prio, &abs_timeout)))
0477     {
0478         ret=errno;
0479         if (EINTR==errno && flags & TPSIGRSTRT)
0480         {
0481             NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_send");
0482             goto restart_send;
0483         }
0484         else if (EAGAIN==errno)
0485         {
0486             PRINT_Q_INFO(q_descr);
0487         }
0488         NDRX_LOG(log_error, "Failed to send data to queue [%s] with error: %d:%s",
0489                                         queue, ret, strerror(ret));
0490     }
0491 
0492 out:
0493     
0494 restart_close:
0495     /* Generally we ignore close */
0496     if ((mqd_t)EXFAIL!=q_descr && EXFAIL==ndrx_mq_close(q_descr))
0497     {
0498         if (EINTR==errno && flags & TPSIGRSTRT)
0499         {
0500             NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_close");
0501             goto restart_close;
0502         }
0503     }
0504 
0505     /** OK, have priority of the last call */
0506     if (NULL!=G_atmi_tls)
0507     {
0508         G_atmi_tls->prio = 0; /* reset the priority setting... */
0509         G_atmi_tls->prio_flags = 0; /* reset the priority setting... */
0510         G_atmi_tls->prio_last = msg_prio;
0511     }
0512 
0513     return ret;
0514 }
0515 
0516 /**
0517  * Generic queue receiver
0518  * @param q_descr - queue descriptor
0519  * @param q_str - string queue, can be NULL, then attribs not set
0520  * @param reply_q_attr - current queue attributes, can be null, then attribs not set
0521  * @param buf - where to put received data
0522  * @param buf_max - buffer size
0523  * @param prio - priority of message
0524  * @return GEN_QUEUE_ERR_NO_DATA/FAIL/data len
0525  */
0526 expublic ssize_t ndrx_generic_q_receive(mqd_t q_descr, char *q_str, 
0527         struct mq_attr *reply_q_attr,
0528         char *buf, long buf_max, 
0529         unsigned *prio, long flags)
0530 {
0531     ssize_t ret=EXSUCCEED;
0532     int use_tout, tout_restart=EXFAIL, tout_act=0, tout=0;
0533     struct timespec abs_timeout;   
0534     
0535     SET_TOUT_CONF;
0536     
0537     if (NULL!=q_str && NULL!=reply_q_attr)
0538     {
0539         if (EXSUCCEED!=ndrx_setup_queue_attrs(reply_q_attr, q_descr, q_str, flags))
0540         {
0541             NDRX_LOG(log_error, "%s: Failed to setup queue attribs, flags=%ld", flags);
0542             EXFAIL_OUT(ret);
0543         }
0544     }
0545     
0546 restart:
0547     SET_TOUT_VALUE;
0548     NDRX_LOG(6, "use timeout: %d config: %d qdescr: %lx", use_tout,
0549         tout_act, (long int)q_descr);
0550     if ((!use_tout && EXFAIL==(ret=ndrx_mq_receive (q_descr, (char *)buf, buf_max, prio))) ||
0551          (use_tout && EXFAIL==(ret=ndrx_mq_timedreceive (q_descr, (char *)buf, buf_max, prio, &abs_timeout))))
0552     {
0553         if (EINTR==errno && flags & TPSIGRSTRT)
0554         {
0555             NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_receive");
0556             goto restart;
0557         }
0558         /* save the errno, so that no one changes */
0559         ret=errno;
0560         if (EAGAIN==ret)
0561         {
0562             NDRX_LOG(log_debug, "No messages in queue");
0563             ret= GEN_QUEUE_ERR_NO_DATA;
0564         }
0565         else
0566         {
0567             int err;
0568 
0569             CONV_ERROR_CODE(ret, err);
0570 
0571             ndrx_TPset_error_fmt(err, "ndrx_mq_receive failed for %lx (%zd): %s",
0572             (long int)q_descr, ret, strerror(ret));
0573             ret=EXFAIL;
0574         }
0575     }
0576     
0577 out:
0578     NDRX_LOG(log_debug, "ndrx_generic_q_receive: %zd", ret);
0579     return ret;
0580 }
0581 
0582 /**
0583  * Initialize generic structure (used for network send.)
0584  * @param call
0585  */
0586 expublic void cmd_generic_init(int ndrxd_cmd, int msg_src, int msg_type,
0587                             command_call_t *call, char *reply_q)
0588 {
0589     call->proto_ver[0]=0;
0590     call->proto_ver[1]=0;
0591     call->proto_ver[2]=0;
0592     call->proto_ver[3]=0;
0593 
0594     call->command_id = ndrxd_cmd;
0595     call->command = ndrxd_cmd;
0596     call->magic = NDRX_MAGIC;
0597     call->msg_src = msg_src;
0598     call->msg_type = msg_type;
0599     NDRX_STRCPY_SAFE(call->reply_queue, reply_q);
0600     call->caller_nodeid = G_atmi_env.our_nodeid;
0601 }
0602 
0603 /**
0604  * Generic server command call & response processing.
0605  * @param p_cmd_map
0606  * @param argc
0607  * @param argv
0608  * @param p_have_next
0609  * @return Unix error code, e.g. ENOENT if not found, etc..
0610  */
0611 expublic int cmd_generic_call_2(int ndrxd_cmd, int msg_src, int msg_type,
0612                             command_call_t *call, size_t call_size,
0613                             char *reply_q,
0614                             mqd_t reply_queue, /* listen for answer on this! */
0615                             mqd_t admin_queue, /* this might be FAIL! */
0616                             char *admin_q_str, /* should be set! */
0617                             int argc, char **argv,
0618                             int *p_have_next,
0619                             int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0620                             void (*p_put_output)(char *text),
0621                             int need_reply,
0622                             int reply_only,
0623                             char *rply_buf_out,    /* might be a null  */
0624                             int *rply_buf_out_len, /* if above is set, then must not be null */
0625                             int flags,
0626         /* TODO Might want to add checks before calling rply_request func...
0627          * for magic and command code. */
0628                             int (*p_rply_request)(char **buf, long len)
0629         )
0630 {
0631     int ret=EXSUCCEED;
0632     command_reply_t *reply;
0633     unsigned prio = 0;
0634     char    *msg_buffer_max= NULL;
0635     ssize_t  reply_len;
0636     int attempts = 1;
0637     
0638 restart:
0639     NDRX_LOG(log_debug, "gencall command: %d, reply_only=%d, need_reply=%d "
0640             "call flags=0x%x, getcall flags=%d",
0641             ndrxd_cmd, reply_only, need_reply, (NULL!=call?call->flags:0), flags);
0642     
0643     if (NULL!=rply_buf_out && NULL==rply_buf_out_len)
0644     {
0645         NDRX_LOG(log_error, "User buffer address specified in params, "
0646                 "but rply_buf_out_len is NULL!");
0647         EXFAIL_OUT(ret);
0648     }
0649     
0650     if (!reply_only)
0651     {
0652         call->command = ndrxd_cmd;
0653         call->command_id = ndrxd_cmd;
0654         call->magic = NDRX_MAGIC;
0655         call->msg_src = msg_src;
0656         call->msg_type = msg_type;
0657         NDRX_STRCPY_SAFE(call->reply_queue, reply_q);
0658         call->caller_nodeid = G_atmi_env.our_nodeid;
0659 
0660         if ((mqd_t)EXFAIL!=admin_queue)
0661         {
0662             NDRX_LOG(log_error, "Sending data to [%s], fd=%d, call flags=0x%x", 
0663                                 admin_q_str, admin_queue, call->flags);
0664             if (EXSUCCEED!=(ret=ndrx_generic_qfd_send(admin_queue, 
0665                     (char *)call, call_size, flags)))
0666             {
0667                 NDRX_LOG(log_error, "Failed to send msg to ndrxd!");
0668 
0669                 if (NULL!=p_put_output)
0670                 {
0671                     if (EINVAL==ret)
0672                     {
0673                         p_put_output(NDRX_QERR_MSG_EINVAL);
0674                     }
0675                     else
0676                     {
0677                         p_put_output(NDRX_XADMIN_ERR_FMT_PFX "Failed to send msg to ndrxd!");
0678                     }
0679                 }
0680 
0681                 goto out;
0682             }
0683         }
0684         else
0685         {
0686             NDRX_LOG(log_info, "Sending data to [%s] call flags=0x%x", 
0687                                     admin_q_str, call->flags);
0688             if (EXSUCCEED!=(ret=ndrx_generic_q_send(admin_q_str, 
0689                     (char *)call, call_size, flags, 0)))
0690             {
0691                 if (NULL!=p_put_output)
0692                 {
0693                     if (EINVAL==ret)
0694                     {
0695                         p_put_output(NDRX_QERR_MSG_EINVAL);
0696                     }
0697                     else
0698                     {
0699                         p_put_output(NDRX_XADMIN_ERR_FMT_PFX "Failed to send msg to ndrxd!");
0700                     }
0701                 }
0702 
0703                 ret=EXFAIL;
0704                 goto out;
0705             }
0706         }
0707     }
0708     else
0709     {
0710         NDRX_LOG(log_debug, "Reply mode only");
0711     }
0712 
0713     if (need_reply)
0714     {
0715         NDRX_LOG(log_debug, "Waiting for response from ndrxd on [%s]", reply_q);
0716     }
0717     else
0718     {
0719         NDRX_LOG(log_debug, "Reply not needed!");
0720         goto out;
0721     }
0722 
0723     do
0724     {
0725         command_call_t *test_call;
0726         
0727         if (NULL==msg_buffer_max)
0728         {
0729             NDRX_SYSBUF_MALLOC_WERR_OUT(msg_buffer_max, reply_len, ret);
0730         }
0731         else
0732         {
0733             reply_len = NDRX_MSGSIZEMAX;
0734         }
0735         
0736         test_call = (command_call_t *)msg_buffer_max;
0737         
0738         /* TODO: have an selector for ptr functions for CAT operations! */
0739         
0740         /* Error could be also -2..! */
0741         if (0>(reply_len=ndrx_generic_q_receive(reply_queue, NULL, NULL,
0742                 msg_buffer_max, reply_len, &prio, flags)))
0743         {
0744             NDRX_LOG(log_error, "Failed to receive reply from ndrxd!");
0745 
0746             if (NULL!=p_put_output)
0747             {
0748                 p_put_output("\nERROR ! Failed to receive reply from ndrxd\n(if timeout - check "
0749                         "NDRX_XADMINTOUT settings)!");
0750             }
0751 
0752             ret=EXFAIL;
0753             goto out;
0754         } /* even's are requests... */
0755         else if (test_call->command % 2 == 0 && NULL!=p_rply_request)
0756         {
0757             if (EXSUCCEED!=p_rply_request(&msg_buffer_max, (size_t)reply_len))
0758             {
0759                 NDRX_LOG(log_error, "Failed to process request!");
0760                 ret=EXFAIL;
0761                 goto out;
0762             }
0763             else
0764             {
0765                 NDRX_LOG(log_warn, "Waiting for next rply msg...");
0766                 continue;
0767             }
0768         }
0769         else if (reply_len < sizeof(command_reply_t))
0770         {
0771             NDRX_LOG(log_error, "Reply size %zd, expected atleast %zu!",
0772                                 reply_len, sizeof(command_reply_t));
0773 
0774             if (NULL!=p_put_output)
0775                 p_put_output("Invalid reply size from ndrxd!");
0776 
0777             ret=EXFAIL;
0778             goto out;
0779         }
0780         /* Check the reply msg */
0781         reply = (command_reply_t *)msg_buffer_max;
0782 
0783         if (NDRX_MAGIC!=reply->magic)
0784         {
0785             NDRX_LOG(log_error, "Got invalid response from ndrxd: invalid magic "
0786                     "(expected: %ld, got: %ld)!",
0787                     NDRX_MAGIC, reply->magic);
0788 
0789             if (NULL!=p_put_output)
0790                 p_put_output("Invalid response - invalid header!");
0791 
0792             ret=EXFAIL;
0793             goto out;
0794         }
0795 
0796         /* Check command code */
0797         if (ndrxd_cmd+1 != reply->command)
0798         {
0799             NDRX_LOG(log_error, "Got invalid response from ndrxd: expected: %d, got %d",
0800                                     call->command+1, reply->command);
0801 
0802             if (NULL!=p_put_output)
0803                 p_put_output("Invalid response - invalid response command code!");
0804 
0805             ret=EXFAIL;
0806             goto out;
0807         }
0808 
0809         /* check reply status: */
0810         if (EXSUCCEED==reply->status)
0811         {
0812             if (NULL!=p_rsp_process)
0813             {
0814                 ret=p_rsp_process(reply, reply_len);
0815             }
0816             else
0817             {   /* just print OK */
0818                 if (NULL!=p_put_output)
0819                     p_put_output("OK");
0820             }
0821             
0822             if (NULL!=rply_buf_out && NULL!=rply_buf_out_len)
0823             {
0824                 if (reply_len>*rply_buf_out_len)
0825                 {
0826                     NDRX_LOG(log_warn, "Received packet size %zd longer "
0827                             "than user buffer in rply_buf_len %d", 
0828                             reply_len, *rply_buf_out_len);
0829                     EXFAIL_OUT(ret);
0830                 }
0831                 else
0832                 {
0833                     memcpy(rply_buf_out, reply, reply_len);
0834                     *rply_buf_out_len = reply_len;
0835                 }
0836             }
0837         }
0838         else
0839         {
0840             char buf[2048];
0841             
0842             attempts++;
0843             
0844             if (NDRXD_ENORMAL==reply->error_code && attempts < G_atmi_env.max_normwait)
0845             {
0846                 snprintf(buf, sizeof(buf), "%s. Attempt %d/%d", 
0847                         reply->error_msg, attempts, G_atmi_env.max_normwait);
0848                 
0849                 NDRX_LOG(log_warn, "%s", buf);
0850 
0851                 if (NULL!=p_put_output)
0852                     p_put_output(buf);
0853                 
0854                 sleep(1);
0855                 
0856                 goto restart;
0857             }
0858             else
0859             {
0860                 snprintf(buf, sizeof(buf), "fail, code: %d: %s", 
0861                         reply->error_code, reply->error_msg);
0862             }
0863             
0864             NDRX_LOG(log_warn, "%s", buf);
0865 
0866             if (NULL!=p_put_output)
0867                     p_put_output(buf);
0868             
0869             ret = reply->status;
0870             goto out;
0871         }
0872         /* do above while we are waiting for stuff back... */
0873     } while((reply->flags & NDRXD_CALL_FLAGS_RSPHAVE_MORE));
0874 
0875 out:
0876 
0877     /* might be take by some double ptr internals */
0878     if (NULL!=msg_buffer_max)
0879     {
0880         NDRX_SYSBUF_FREE(msg_buffer_max);
0881     }
0882 
0883     return ret;
0884 }
0885 
0886 /**
0887  * This is wrapper for cmd_generic_call_2,
0888  * Full request.
0889  * @return Unix error code, e.g. ENOENT if not found, etc..
0890  */
0891 expublic int cmd_generic_call(int ndrxd_cmd, int msg_src, int msg_type,
0892                             command_call_t *call, size_t call_size,
0893                             char *reply_q,
0894                             mqd_t reply_queue, /* listen for answer on this! */
0895                             mqd_t admin_queue, /* this might be FAIL! */
0896                             char *admin_q_str, /* should be set! */
0897                             int argc, char **argv,
0898                             int *p_have_next,
0899                             int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0900                             void (*p_put_output)(char *text),
0901                             int need_reply)
0902 {
0903     return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0904                             call, call_size,
0905                             reply_q,
0906                             reply_queue,
0907                             admin_queue,
0908                             admin_q_str,
0909                             argc, argv,
0910                             p_have_next,
0911                             p_rsp_process,
0912                             p_put_output,
0913                             need_reply,
0914                             EXFALSE,
0915                             NULL, NULL, TPNOTIME, NULL);
0916 }
0917 
0918 /**
0919  * This is wrapper for cmd_generic_call_2,
0920  * This exposes flags to user.
0921  */
0922 expublic int cmd_generic_callfl(int ndrxd_cmd, int msg_src, int msg_type,
0923                             command_call_t *call, size_t call_size,
0924                             char *reply_q,
0925                             mqd_t reply_queue, /* listen for answer on this! */
0926                             mqd_t admin_queue, /* this might be FAIL! */
0927                             char *admin_q_str, /* should be set! */
0928                             int argc, char **argv,
0929                             int *p_have_next,
0930                             int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0931                             void (*p_put_output)(char *text),
0932                             int need_reply,
0933                             int flags)
0934 {
0935     return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0936                             call, call_size,
0937                             reply_q,
0938                             reply_queue,
0939                             admin_queue,
0940                             admin_q_str,
0941                             argc, argv,
0942                             p_have_next,
0943                             p_rsp_process,
0944                             p_put_output,
0945                             need_reply,
0946                             EXFALSE,
0947                             NULL, NULL, flags, NULL);
0948 }
0949 
0950 
0951 /**
0952  * Full request.
0953  * Including output buffers.
0954  */
0955 expublic int cmd_generic_bufcall(int ndrxd_cmd, int msg_src, int msg_type,
0956                             command_call_t *call, size_t call_size,
0957                             char *reply_q,
0958                             mqd_t reply_queue, /* listen for answer on this! */
0959                             mqd_t admin_queue, /* this might be FAIL! */
0960                             char *admin_q_str, /* should be set! */
0961                             int argc, char **argv,
0962                             int *p_have_next,
0963                             int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0964                             void (*p_put_output)(char *text),
0965                             int need_reply,
0966                             int reply_only,
0967                             char *rply_buf_out,             /* might be a null  */
0968                             int *rply_buf_out_len,          /* if above is set, then must not be null */
0969                             int flags,
0970                             int (*p_rply_request)(char **buf, long len)
0971         )
0972 {
0973     return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0974                             call, call_size,
0975                             reply_q,
0976                             reply_queue,
0977                             admin_queue,
0978                             admin_q_str,
0979                             argc, argv,
0980                             p_have_next,
0981                             p_rsp_process,
0982                             p_put_output,
0983                             need_reply,
0984                             reply_only,
0985                             rply_buf_out, 
0986                             rply_buf_out_len, 
0987                             flags,
0988                             p_rply_request);
0989 }
0990 
0991 
0992 /**
0993  * Call from list.
0994  * @param ndrxd_cmd
0995  * @param msg_src
0996  * @param msg_type
0997  * @param call
0998  * @param call_size
0999  * @param reply_q
1000  * @param reply_queue
1001  * @param admin_queue
1002  * @param admin_q_str
1003  * @param argc
1004  * @param argv
1005  * @param p_have_next
1006  * @param arglist
1007  * @param reply_only
1008  * @param flags additional flags for queue call
1009  * @return
1010  */
1011 extern int cmd_generic_listcall(int ndrxd_cmd, int msg_src, int msg_type,
1012                             command_call_t *call, size_t call_size,
1013                             char *reply_q,
1014                             mqd_t reply_queue, /* listen for answer on this! */
1015                             mqd_t admin_queue, /* this might be FAIL! */
1016                             char *admin_q_str, /* should be set! */
1017                             int argc, char **argv,
1018                             int *p_have_next,
1019                             gencall_args_t *arglist/* this is list of process */,
1020                             int reply_only,
1021                             long flags)
1022 {
1023         return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
1024                             call, call_size,
1025                             reply_q,
1026                             reply_queue,
1027                             admin_queue,
1028                             admin_q_str,
1029                             argc, argv,
1030                             p_have_next,
1031                             arglist[ndrxd_cmd].p_rsp_process,
1032                             arglist[ndrxd_cmd].p_put_output,
1033                             arglist[ndrxd_cmd].need_reply,
1034                             reply_only,
1035                 /* xadmin on solaris fails to receive answers when ndrxd exits.. */
1036                             NULL, NULL, TPSIGRSTRT | flags, NULL);
1037 }
1038 
1039 /**
1040  * Send error reply back to originator.
1041  * Fix this function to send error message over the bridge.
1042  */
1043 expublic int reply_with_failure(long flags, tp_command_call_t *last_call,
1044             char *buf, int *len, long rcode)
1045 {
1046     int ret=EXSUCCEED;
1047     char fn[] = "reply_with_failure";
1048     tp_command_call_t call_b;
1049     tp_command_call_t *call;
1050     char reply_to[NDRX_MAX_Q_SIZE+1] = {EXEOS};
1051     
1052     /* Bug #570 */
1053     if (last_call->flags & TPNOREPLY)
1054     {
1055         NDRX_LOG(log_warn, "No reply expected ignore error delivery");
1056         goto out;
1057     }
1058     
1059     if (NULL==buf)
1060     {
1061         call = &call_b;
1062     }
1063     else
1064     {
1065         call = (tp_command_call_t *)buf;
1066     }
1067     
1068     memset(call, 0, sizeof(*call));
1069     call->command_id = ATMI_COMMAND_TPREPLY;
1070     call->cd = last_call->cd;
1071     call->timestamp = last_call->timestamp;
1072     call->callseq = last_call->callseq;
1073     /* Give some info which server replied - for bridge we need target put here! */
1074     NDRX_STRCPY_SAFE(call->reply_to, last_call->reply_to);
1075     call->sysflags |=SYS_FLAG_REPLY_ERROR;
1076     call->rcode = rcode;
1077     NDRX_STRCPY_SAFE(call->callstack, last_call->callstack);
1078     
1079     NDRX_LOG(log_debug, "error reply cd %d callseq %u timestamp %ld queue [%s] error %ld", 
1080             call->cd, call->callseq, call->timestamp, call->reply_to, call->rcode);
1081     if (EXSUCCEED!=fill_reply_queue(call->callstack, last_call->reply_to, reply_to))
1082     {
1083         NDRX_LOG(log_error, "ATTENTION!! Failed to get reply queue");
1084         userlog("ATTENTION!! Failed to get reply queue");
1085         goto out;
1086     }
1087 
1088     if (NULL==buf)
1089     {
1090         if (EXSUCCEED!=(ret=ndrx_generic_q_send(reply_to, (char *)call, 
1091                 sizeof(*call), flags, 0)))
1092         {
1093             NDRX_LOG(log_error, "%s: Failed to send error reply back, os err: %s", 
1094                     fn, strerror(ret));
1095             goto out;
1096         }
1097     }
1098     else
1099     {
1100         NDRX_LOG(log_debug, "Buffer specified not sending anywhere");
1101     }
1102     
1103 
1104 out:
1105     return ret;
1106 }
1107 
1108 /**
1109  * Get q attributes
1110  * @param q - queue name
1111  * @param p_att - attributes 
1112  * @return SUCCEED/FAIL
1113  */
1114 expublic int ndrx_get_q_attr(char *q, struct mq_attr *p_att)
1115 {
1116     int ret=EXSUCCEED;
1117     mqd_t q_descr=(mqd_t)EXFAIL;
1118     
1119     /* read the stats of the queue */
1120     if ((mqd_t)EXFAIL==(q_descr = ndrx_mq_open_at_wrp(q, 0)))
1121     {
1122         NDRX_LOG(log_warn, "Failed to get attribs of Q: [%s], err: %s", 
1123                 q, strerror(errno));
1124         EXFAIL_OUT(ret);
1125     }
1126 
1127     /* read the attributes of the Q */
1128     if (EXSUCCEED!= ndrx_mq_getattr(q_descr, p_att))
1129     {
1130         NDRX_LOG(log_warn, "Failed to get attribs of Q: %d, err: %s", 
1131                 q_descr, strerror(errno));
1132         EXFAIL_OUT(ret);
1133     }
1134     
1135 out:
1136     
1137     if ((mqd_t)EXFAIL!=q_descr)
1138     {
1139         ndrx_mq_close(q_descr);
1140     }
1141 
1142     return ret;
1143 }
1144 
1145 /**
1146  * Return list of services, starting with name
1147  * @param start_with - service name start.
1148  * @return 
1149  */
1150 expublic atmi_svc_list_t* ndrx_get_svc_list(int (*p_filter)(char *svcnm))
1151 {
1152     atmi_svc_list_t *ret = NULL;
1153     atmi_svc_list_t *tmp;
1154     shm_svcinfo_t *svcinfo = (shm_svcinfo_t *) G_svcinfo.mem;
1155     int i;
1156     
1157     if (NULL==svcinfo)
1158     {
1159         NDRX_LOG(log_error, "shm_svcinfo memory is NULL!");
1160         return NULL; /* Nothing todo. */
1161     }
1162     
1163     /* We assume shm is OK! */
1164     for (i=0; i<G_max_svcs; i++)
1165     {
1166         /* we might have in old service in SHM (with no actual servers) thus 
1167          * skip such.
1168          */
1169         if (EXEOS!=SHM_SVCINFO_INDEX(svcinfo, i)->service[0] && 
1170                 (SHM_SVCINFO_INDEX(svcinfo, i)->srvs || SHM_SVCINFO_INDEX(svcinfo, i)->csrvs) )
1171         {
1172             /* Check filter, if ok - add to list! */
1173             if (p_filter(SHM_SVCINFO_INDEX(svcinfo, i)->service))
1174             {
1175                 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(atmi_svc_list_t))))
1176                 {
1177                     NDRX_LOG(log_error, "Failed to malloc %d: %s", 
1178                         sizeof(atmi_svc_list_t), strerror(errno));
1179                     
1180                     userlog("Failed to malloc %d: %s", 
1181                         sizeof(atmi_svc_list_t), strerror(errno));
1182                     
1183                     goto out;
1184                 }
1185                 NDRX_STRCPY_SAFE(tmp->svcnm, SHM_SVCINFO_INDEX(svcinfo, i)->service);
1186                 LL_APPEND(ret, tmp);
1187             }   
1188         }
1189     } /* for */
1190     
1191 out:
1192     return ret;
1193 }
1194 
1195 /**
1196  * Fix queue attributes to match the requested mode.
1197  * @param conv
1198  * @param flags
1199  * @return SUCCEED/FAIL
1200  */
1201 expublic int ndrx_setup_queue_attrs(struct mq_attr *p_q_attr,
1202                                 mqd_t listen_q,
1203                                 char *listen_q_str, 
1204                                 long flags)
1205 {
1206     int ret=EXSUCCEED;
1207     int change_flags = EXFALSE;
1208     struct mq_attr new;
1209     char fn[] = "ndrx_setup_queue_attrs";
1210 
1211     /* NDRX_LOG(log_debug, "ATTRS BEFORE: %d", p_q_attr->mq_flags); */
1212 
1213     if (flags & TPNOBLOCK && !(p_q_attr->mq_flags & O_NONBLOCK))
1214     {
1215         /* change attributes non block mode*/
1216         new = *p_q_attr;
1217         new.mq_flags |= O_NONBLOCK;
1218         change_flags = EXTRUE;
1219         NDRX_LOG(log_debug, "Changing queue [%s] to non blocked",
1220                                             listen_q_str);
1221     }
1222     else if (!(flags & TPNOBLOCK) && (p_q_attr->mq_flags & O_NONBLOCK))
1223     {
1224         /* change attributes to block mode */
1225         new = *p_q_attr;
1226         new.mq_flags &= ~O_NONBLOCK; /* remove non block flag */
1227         change_flags = EXTRUE;
1228         NDRX_LOG(log_debug, "Changing queue [%s] to blocked",
1229                                             listen_q_str);
1230     }
1231 
1232     if (change_flags)
1233     {
1234         if (EXFAIL==ndrx_mq_setattr(listen_q, &new,
1235                             NULL))
1236         {
1237             ndrx_TPset_error_fmt(TPEOS, "%s: Failed to change attributes for queue [%s] fd %d: %s",
1238                                 fn, listen_q_str, listen_q, strerror(errno));
1239             ret=EXFAIL;
1240             goto out;
1241         }
1242         else
1243         {
1244             /* Save new attrs */
1245             *p_q_attr = new;
1246         }
1247     }
1248 
1249     /* NDRX_LOG(log_debug, "ATTRS AFTER: %d", p_q_attr->mq_flags); */
1250     
1251 out:
1252     return ret;
1253 }
1254 
1255 /* vim: set ts=4 sw=4 et smartindent: */