Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Tmqueue server - transaction monitor
0003  *
0004  * @file tmqapi.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <atmi_int.h>
0044 #include <typed_buf.h>
0045 #include <ndrstandard.h>
0046 #include <ubf.h>
0047 #include <Exfields.h>
0048 
0049 #include <ndrxdcmn.h>
0050 
0051 #include "tmqueue.h"
0052 #include "../libatmisrv/srv_int.h"
0053 #include "tperror.h"
0054 #include "nstdutil.h"
0055 #include <qcommon.h>
0056 #include "tmqd.h"
0057 #include "userlog.h"
0058 #include "cconfig.h"
0059 #include <ubfutil.h>
0060 /*---------------------------Externs------------------------------------*/
0061 /*---------------------------Macros-------------------------------------*/
0062 
0063 /** Check diagnostic code, if or QMEINVAL or (QMENOMSG) -> No abort */
0064 #define SET_NO_ABORT do {\
0065             if (EXSUCCEED!=ret)\
0066             {\
0067                 switch(qctl_out.diagnostic)\
0068                 {\
0069                     case QMEINVAL:\
0070                     case QMENOMSG:\
0071                         *p_sysflags|=NDRX_SYS_NOABORT;\
0072                         break;\
0073                 }\
0074             }\
0075         } while (0)
0076 
0077 /*---------------------------Enums--------------------------------------*/
0078 /*---------------------------Typedefs-----------------------------------*/
0079 /*---------------------------Globals------------------------------------*/
0080 /*---------------------------Statics------------------------------------*/
0081 exprivate __thread int M_is_xa_open = EXFALSE;
0082 exprivate MUTEX_LOCKDECL(M_tstamp_lock);
0083 /*---------------------------Prototypes---------------------------------*/
0084 
0085 /******************************************************************************/
0086 /*                               Q API COMMANDS                               */
0087 /******************************************************************************/
0088 
0089 /**
0090  * Do the internal commit of transaction (request sent from other TM)
0091  * @param p_ub
0092  * @param int_diag internal diagnostics, flags
0093  * @return 
0094  */
0095 expublic int tmq_enqueue(UBFH *p_ub, int *int_diag)
0096 {
0097     int ret = EXSUCCEED;
0098     tmq_msg_t *p_msg = NULL;
0099     char *data = NULL;
0100     BFLDLEN len = 0;
0101     TPQCTL qctl_out;
0102     static volatile int first = EXTRUE;
0103     int local_tx = EXFALSE;
0104     
0105     /* To guarentee unique order in same Q space...: */
0106     static volatile long t_sec = 0;
0107     static volatile long t_usec = 0;
0108     /*
0109      * TODO: Could add some start randomisation, in case of clock skew (turn back),
0110      * the random could ensure the uniqueness of the message id.
0111      */
0112     static volatile int t_cntr=0;
0113     
0114     /* Add message to Q */
0115     NDRX_LOG(log_debug, "Into tmq_enqueue()");
0116     
0117     memset(&qctl_out, 0, sizeof(qctl_out));
0118     
0119     ndrx_debug_dump_UBF(log_info, "tmq_enqueue called with", p_ub);
0120     
0121     if (!M_is_xa_open)
0122     {
0123         if (EXSUCCEED!=tpopen()) /* init the lib anyway... */
0124         {
0125             NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s", 
0126                     tpstrerror(tperrno));
0127             userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0128         }
0129         else
0130         {
0131             M_is_xa_open = EXTRUE;
0132         }
0133     }
0134             
0135     if (!tpgetlev())
0136     {
0137         NDRX_LOG(log_debug, "Not in global transaction, starting local...");
0138         if (EXSUCCEED!=tpbegin(G_tmqueue_cfg.dflt_timeout, 0))
0139         {
0140             NDRX_LOG(log_error, "Failed to start global tx!");
0141             userlog("Failed to start global tx!");
0142         }
0143         else
0144         {
0145             NDRX_LOG(log_debug, "Transaction started ok...");
0146             local_tx = EXTRUE;
0147         }
0148     }
0149     
0150     if (NULL==(data = Bgetalloc(p_ub, EX_DATA, 0, &len)))
0151     {
0152         NDRX_LOG(log_error, "Missing EX_DATA!");
0153         userlog("Missing EX_DATA!");
0154         
0155         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "Missing EX_DATA!");
0156         qctl_out.diagnostic = QMEINVAL;
0157         
0158         EXFAIL_OUT(ret);
0159     }
0160     
0161     /*
0162      * Get the message size in EX_DATA
0163      */
0164     p_msg = NDRX_MALLOC(sizeof(tmq_msg_t)+len);
0165     
0166     if (NULL==p_msg)
0167     {
0168         NDRX_LOG(log_error, "Failed to malloc tmq_msg_t!");
0169         userlog("Failed to malloc tmq_msg_t!");
0170         
0171         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "Failed to malloc tmq_msg_t!");
0172         qctl_out.diagnostic = QMEOS;
0173         
0174         EXFAIL_OUT(ret);
0175     }
0176     
0177     memset(p_msg, 0, sizeof(tmq_msg_t));
0178     
0179     memcpy(p_msg->msg, data, len);
0180     p_msg->len = len;    
0181     
0182     NDRX_DUMP(log_debug, "Got message for Q: ", p_msg->msg, p_msg->len);
0183     
0184     if (EXSUCCEED!=Bget(p_ub, EX_QNAME, 0, p_msg->hdr.qname, 0))
0185     {
0186         NDRX_LOG(log_error, "tmq_enqueue: failed to get EX_QNAME");
0187         
0188         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed to get EX_QNAME!");
0189         qctl_out.diagnostic = QMEINVAL;
0190         
0191         EXFAIL_OUT(ret);
0192     }
0193     
0194     /* Restore back the C structure */
0195     if (EXSUCCEED!=tmq_tpqctl_from_ubf_enqreq(p_ub, &p_msg->qctl))
0196     {
0197         NDRX_LOG(log_error, "tmq_enqueue: failed convert ctl "
0198                 "to internal UBF buf!");
0199         userlog("tmq_enqueue: failed convert ctl "
0200                 "to internal UBF buf!");
0201         
0202         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed convert ctl "
0203                 "to internal UBF buf!");
0204         qctl_out.diagnostic = QMESYSTEM;
0205         
0206         EXFAIL_OUT(ret);
0207     }
0208     
0209     /* Build up the message. */
0210     tmq_setup_cmdheader_newmsg(&p_msg->hdr, p_msg->hdr.qname, 
0211             G_tmqueue_cfg.vnodeid, G_server_conf.srv_id, ndrx_G_qspace, p_msg->qctl.flags);
0212     
0213     /* Return the message id. */
0214     memcpy(qctl_out.msgid, p_msg->hdr.msgid, TMMSGIDLEN);
0215     memcpy(p_msg->qctl.msgid, p_msg->hdr.msgid, TMMSGIDLEN);
0216     
0217     p_msg->lockthreadid = ndrx_gettid(); /* Mark as locked by thread */
0218 
0219     ndrx_utc_tstamp2(&p_msg->msgtstamp, &p_msg->msgtstamp_usec);
0220     
0221     MUTEX_LOCK_V(M_tstamp_lock);
0222 
0223     if (first)
0224     {
0225         /* in case if clock is moved backwards,
0226          * this will give lower chance of duplicate message
0227          * ids... Alos we are using tstamp_usec, which is also
0228          * low probablity to match for message even of clock moved
0229          * backwards.
0230          */
0231         t_cntr = ndrx_rand()%2500000;
0232         first=EXFALSE;
0233     }
0234     
0235     if (p_msg->msgtstamp == t_sec && p_msg->msgtstamp_usec == t_usec)
0236     {
0237         t_cntr++;
0238     }
0239     else
0240     {
0241         t_sec = p_msg->msgtstamp;
0242         t_usec = p_msg->msgtstamp_usec;
0243         t_cntr = ndrx_rand()%2500000;
0244     }
0245     p_msg->msgtstamp_cntr = t_cntr;
0246     MUTEX_UNLOCK_V(M_tstamp_lock);
0247     
0248     p_msg->status = TMQ_STATUS_ACTIVE;
0249     
0250     NDRX_LOG(log_info, "Messag prepared ok, about to enqueue to [%s] Q...",
0251             p_msg->hdr.qname);
0252     
0253     if (EXSUCCEED!=tmq_msg_add(&p_msg, EXFALSE, &qctl_out, int_diag))
0254     {
0255         NDRX_LOG(log_error, "tmq_enqueue: failed to enqueue!");
0256         userlog("tmq_enqueue: failed to enqueue!");
0257         
0258         /* set generic error, if not already provided */
0259         if (EXSUCCEED==qctl_out.diagnostic)
0260         {
0261             NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: failed to enqueue!");
0262             qctl_out.diagnostic = QMESYSTEM;
0263         }
0264         
0265         EXFAIL_OUT(ret);
0266     }
0267 
0268     /* if all ok, for performance reasons remove the data block from reply */
0269     Bdel(p_ub, EX_DATA, 0);
0270     
0271 out:
0272     /* free up the temp memory */
0273     if (NULL!=data)
0274     {
0275         NDRX_FREE(data);
0276     }
0277 
0278     if (EXSUCCEED!=ret && NULL!=p_msg)
0279     {
0280         NDRX_LOG(log_warn, "About to free p_msg!");
0281         NDRX_FREE(p_msg);
0282     }
0283 
0284     if (local_tx)
0285     {
0286         if (EXSUCCEED!=ret)
0287         {
0288             NDRX_LOG(log_error, "Aborting transaction");
0289             tpabort(0);
0290         } 
0291         else
0292         {
0293             NDRX_LOG(log_info, "Committing transaction!");
0294             
0295             if (EXSUCCEED!=tpcommit(0))
0296             {
0297                 NDRX_LOG(log_error, "Commit failed!");
0298                 userlog("Commit failed!");
0299                 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_enqueue: commit failed!");
0300                 qctl_out.diagnostic = QMESYSTEM;
0301                 ret=EXFAIL;
0302             }
0303         }
0304     }
0305 
0306     /* Setup response fields
0307      * Not sure about existing ones (seems like they will stay in buffer)
0308      * i.e. request fields
0309      */
0310     if (EXSUCCEED!=tmq_tpqctl_to_ubf_enqrsp(p_ub, &qctl_out))
0311     {
0312         NDRX_LOG(log_error, "tmq_enqueue: failed to generate response buffer!");
0313         userlog("tmq_enqueue: failed to generate response buffer!");
0314         ret=EXFAIL;
0315         /* set error! */
0316     }
0317 
0318     return ret;
0319 }
0320 
0321 /**
0322  * Dequeue message
0323  * @param p_ub
0324  * @param int_diag internal diagnostics, flags
0325  * @return EXSUCCEED/EXFAIL
0326  */
0327 expublic int tmq_dequeue(UBFH **pp_ub, int *int_diag)
0328 {
0329     /* Search for not locked message, lock it, issue command to disk for
0330      * delete & return the message to buffer (also needs to resize the buffer correctly)
0331      */
0332     int ret = EXSUCCEED;
0333     tmq_msg_t *p_msg = NULL;
0334     TPQCTL qctl_out, qctl_in;
0335     int local_tx = EXFALSE;
0336     char qname[TMQNAMELEN+1];
0337     long buf_realoc_size;
0338     char corrid_str[TMCORRIDLEN_STR+1];
0339     char *p_corrid_str = NULL;
0340     
0341     /* Add message to Q */
0342     NDRX_LOG(log_debug, "Into tmq_dequeue()");
0343     
0344     memset(&qctl_in, 0, sizeof(qctl_in));
0345     memset(&qctl_out, 0, sizeof(qctl_out));
0346     
0347     ndrx_debug_dump_UBF(log_info, "tmq_dequeue called with", *pp_ub);
0348     
0349     if (!M_is_xa_open)
0350     {
0351         if (EXSUCCEED!=tpopen()) /* init the lib anyway... */
0352         {
0353             NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s", 
0354                     tpstrerror(tperrno));
0355             userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0356         }
0357         else
0358         {
0359             M_is_xa_open = EXTRUE;
0360         }
0361     }
0362     
0363     if (EXSUCCEED!=tmq_tpqctl_from_ubf_deqreq(*pp_ub, &qctl_in))
0364     {
0365         NDRX_LOG(log_error, "tmq_dequeue: failed to read request qctl!");
0366         userlog("tmq_dequeue: failed to read request qctl!");
0367         EXFAIL_OUT(ret);
0368     }
0369     
0370     if (!tpgetlev())
0371     {
0372         NDRX_LOG(log_debug, "Not in global transaction, starting local...");
0373         if (EXSUCCEED!=tpbegin(G_tmqueue_cfg.dflt_timeout, 0))
0374         {
0375             NDRX_LOG(log_error, "Failed to start global tx!");
0376             userlog("Failed to start global tx!");
0377         }
0378         else
0379         {
0380             NDRX_LOG(log_debug, "Transaction started ok...");
0381             local_tx = EXTRUE;
0382         }
0383     }
0384     
0385     if (EXSUCCEED!=Bget(*pp_ub, EX_QNAME, 0, qname, 0))
0386     {
0387         NDRX_LOG(log_error, "tmq_dequeue: failed to get EX_QNAME");
0388         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_dequeue: failed to get EX_QNAME!");
0389         qctl_out.diagnostic = QMEINVAL;
0390         
0391         EXFAIL_OUT(ret);
0392     }
0393     
0394     /* Get FB size (current) */
0395     NDRX_LOG(log_info, "qctl_req flags: %ld", qctl_in.flags);
0396     
0397     if (qctl_in.flags & TPQGETBYMSGID)
0398     {
0399         if (NULL==(p_msg = tmq_msg_dequeue_by_msgid(qctl_in.msgid, qctl_in.flags, 
0400                 &qctl_out.diagnostic, qctl_out.diagmsg, sizeof(qctl_out.diagmsg), int_diag)))
0401         {
0402             char msgid_str[TMMSGIDLEN_STR+1];
0403             int lev = log_info;
0404             tmq_msgid_serialize(qctl_in.msgid, msgid_str);
0405             
0406             if (qctl_out.diagnostic!=QMENOMSG)
0407             {
0408                 lev=log_error;
0409             }
0410             
0411             NDRX_LOG(lev, "tmq_dequeue: no message found for given msgid [%s] %ld: %s", 
0412                     msgid_str, qctl_out.diagnostic, qctl_out.diagmsg);
0413             
0414             EXFAIL_OUT(ret);
0415         }
0416     }
0417     else 
0418     {
0419         /* setcorrid to not null*/
0420         if (qctl_in.flags & TPQGETBYCORRID)
0421         {
0422             tmq_msgid_serialize(qctl_in.corrid, corrid_str);
0423             p_corrid_str = corrid_str;
0424         }
0425 
0426         if (NULL==(p_msg = tmq_msg_dequeue(qname, qctl_in.flags, EXFALSE, 
0427             &qctl_out.diagnostic, qctl_out.diagmsg, sizeof(qctl_out.diagmsg), 
0428                 p_corrid_str, int_diag)))
0429         {
0430             int lev = log_info;
0431             
0432             if (qctl_out.diagnostic!=QMENOMSG)
0433             {
0434                 lev=log_error;
0435             }
0436         
0437             NDRX_LOG(lev, "tmq_dequeue: no message in Q [%s] corrid_str [%s] %ld: %s", qname,
0438                 NULL!=p_corrid_str?corrid_str:"N/A", qctl_out.diagnostic, qctl_out.diagmsg);
0439         
0440             EXFAIL_OUT(ret);
0441         }
0442     }
0443     
0444     /* Use the original metadata */
0445     memcpy(&qctl_out, &p_msg->qctl, sizeof(qctl_out));
0446     
0447     buf_realoc_size = Bused (*pp_ub) + p_msg->len + 1024;
0448     
0449     if (NULL==(*pp_ub = (UBFH *)tprealloc ((char *)*pp_ub, buf_realoc_size)))
0450     {
0451         NDRX_LOG(log_error, "Failed to allocate buffer to size: %ld", buf_realoc_size);
0452         userlog("Failed to allocate buffer to size: %ld", buf_realoc_size);
0453         EXFAIL_OUT(ret);
0454     }
0455     
0456     if (EXSUCCEED!=Bchg(*pp_ub, EX_DATA, 0, p_msg->msg, p_msg->len))
0457     {
0458         NDRX_LOG(log_error, "failed to set EX_DATA!");
0459         userlog("failed to set EX_DATA!");
0460         
0461         NDRX_STRCPY_SAFE(qctl_out.diagmsg, "failed to set EX_DATA!");
0462         qctl_out.diagnostic = QMESYSTEM;
0463         
0464         /* Unlock msg if it was peek */
0465         if (TPQPEEK & qctl_in.flags)
0466         {
0467             tmq_unlock_msg_by_msgid(p_msg->qctl.msgid, 0);
0468         }
0469         
0470         EXFAIL_OUT(ret);
0471     }
0472     
0473     /* Unlock msg if it was peek */
0474     if (TPQPEEK & qctl_in.flags && 
0475             EXSUCCEED!=tmq_unlock_msg_by_msgid(p_msg->qctl.msgid, EXTRUE))
0476     {
0477         NDRX_LOG(log_error, "Failed to unlock msg!");
0478         EXFAIL_OUT(ret);
0479     }
0480     
0481 out:
0482 
0483     if (local_tx)
0484     {
0485         if (EXSUCCEED!=ret)
0486         {
0487             NDRX_LOG(log_error, "Aborting transaction");
0488             tpabort(0);
0489         } 
0490         else
0491         {
0492             NDRX_LOG(log_info, "Committing transaction!");
0493             if (EXSUCCEED!=tpcommit(0))
0494             {
0495                 NDRX_LOG(log_error, "Commit failed!");
0496                 userlog("Commit failed!");
0497                 NDRX_STRCPY_SAFE(qctl_out.diagmsg, "tmq_dequeue: commit failed!");
0498                 qctl_out.diagnostic = QMESYSTEM;
0499                 ret=EXFAIL;
0500             }
0501         }
0502     }
0503 
0504     /* Setup response fields
0505      * Not sure about existing ones (seems like they will stay in buffer)
0506      * i.e. request fields
0507      */
0508     if (EXSUCCEED!=tmq_tpqctl_to_ubf_deqrsp(*pp_ub, &qctl_out))
0509     {
0510         NDRX_LOG(log_error, "tmq_dequeue: failed to generate response buffer!");
0511         userlog("tmq_dequeue: failed to generate response buffer!");
0512         ret=EXFAIL;
0513     }
0514 
0515     return ret;
0516 }
0517 
0518 /******************************************************************************/
0519 /*                         COMMAND LINE API                                   */
0520 /******************************************************************************/
0521 /**
0522  * Return list of queues
0523  * @param p_ub
0524  * @param cd - call descriptor
0525  * @return 
0526  */
0527 expublic int tmq_mqlq(UBFH *p_ub, int cd)
0528 {
0529     int ret = EXSUCCEED;
0530     long revent;
0531     fwd_qlist_t *el, *tmp, *list;
0532     short nodeid = G_tmqueue_cfg.vnodeid;
0533     short srvid = tpgetsrvid();
0534     char *fn = "tmq_printqueue";
0535     /* Get list of queues */
0536     
0537     if (NULL==(list = tmq_get_qlist(EXFALSE, EXTRUE)))
0538     {
0539         NDRX_LOG(log_info, "%s: No queues found", fn);
0540     }
0541     else
0542     {
0543         NDRX_LOG(log_info, "%s: Queues found", fn);
0544     }
0545     
0546     DL_FOREACH_SAFE(list,el,tmp)
0547     {
0548         long msgs = 0;
0549         long locked = 0;
0550     
0551         tmq_get_q_stats(el->qname, &msgs, &locked);
0552                 
0553         NDRX_LOG(log_debug, "returning %s/%s", ndrx_G_qspace, el->qname);
0554         
0555         if (EXSUCCEED!=Bchg(p_ub, EX_QSPACE, 0, ndrx_G_qspace, 0L) ||
0556             EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, el->qname, 0L) ||
0557             EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0558             EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0559             EXSUCCEED!=Bchg(p_ub, EX_QNUMMSG, 0, (char *)&msgs, 0L) ||
0560             EXSUCCEED!=Bchg(p_ub, EX_QNUMLOCKED, 0, (char *)&locked, 0L) ||
0561             EXSUCCEED!=Bchg(p_ub, EX_QNUMSUCCEED, 0, (char *)&el->succ, 0L) ||
0562             EXSUCCEED!=Bchg(p_ub, EX_QNUMFAIL, 0, (char *)&el->fail, 0L) ||
0563             EXSUCCEED!=Bchg(p_ub, EX_QNUMENQ, 0, (char *)&el->numenq, 0L) ||
0564             EXSUCCEED!=Bchg(p_ub, EX_QNUMDEQ, 0, (char *)&el->numdeq, 0L) 
0565                 )
0566         {
0567             NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0568             EXFAIL_OUT(ret);
0569         }
0570         if (EXFAIL == tpsend(cd,
0571                             (char *)p_ub,
0572                             0L,
0573                             0,
0574                             &revent))
0575         {
0576             NDRX_LOG(log_error, "Send data failed [%s] %ld",
0577                                 tpstrerror(tperrno), revent);
0578             EXFAIL_OUT(ret);
0579         }
0580         else
0581         {
0582             NDRX_LOG(log_debug,"sent ok");
0583         }
0584         
0585         DL_DELETE(list, el);
0586         NDRX_FREE((char *)el);
0587     }
0588     
0589 out:
0590 
0591     return ret;
0592 }
0593 
0594 /**
0595  * List queue definitions/config (return the defaulted flag if so)
0596  * @param p_ub
0597  * @param cd - call descriptor
0598  * @return 
0599  */
0600 expublic int tmq_mqlc(UBFH *p_ub, int cd)
0601 {
0602     int ret = EXSUCCEED;
0603     long revent;
0604     fwd_qlist_t *el, *tmp, *list;
0605     short nodeid = G_tmqueue_cfg.vnodeid;
0606     short srvid = tpgetsrvid();
0607     char *fn = "tmq_mqlc";
0608     char qdef[TMQ_QDEF_MAX];
0609     char flags[128];
0610     int is_default = EXFALSE;
0611     /* Get list of queues */
0612     
0613     if (NULL==(list = tmq_get_qlist(EXFALSE, EXTRUE)))
0614     {
0615         NDRX_LOG(log_info, "%s: No queues found", fn);
0616     }
0617     else
0618     {
0619         NDRX_LOG(log_info, "%s: Queues found", fn);
0620     }
0621     
0622     DL_FOREACH_SAFE(list,el,tmp)
0623     {
0624         is_default = EXFALSE;
0625 
0626         if (EXSUCCEED==tmq_build_q_def(el->qname, &is_default, qdef, sizeof(qdef)))
0627         {
0628             NDRX_LOG(log_debug, "returning %s/%s", ndrx_G_qspace, el->qname);
0629             
0630             flags[0] = EXEOS;
0631             
0632             if (is_default)
0633             {
0634                 NDRX_STRCAT_S(flags, sizeof(flags), "D");
0635             }
0636             
0637 
0638             if (EXSUCCEED!=Bchg(p_ub, EX_QSPACE, 0, ndrx_G_qspace, 0L) ||
0639                 EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, el->qname, 0L) ||
0640                 EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0641                 EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0642                 EXSUCCEED!=CBchg(p_ub, EX_DATA, 0, qdef, 0L, BFLD_STRING) ||
0643                 EXSUCCEED!=Bchg(p_ub, EX_QSTRFLAGS, 0, flags, 0L)
0644                 )
0645             {
0646                 NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0647                 EXFAIL_OUT(ret);
0648             }
0649             if (EXFAIL == tpsend(cd,
0650                                 (char *)p_ub,
0651                                 0L,
0652                                 0,
0653                                 &revent))
0654             {
0655                 NDRX_LOG(log_error, "Send data failed [%s] %ld",
0656                                     tpstrerror(tperrno), revent);
0657                 EXFAIL_OUT(ret);
0658             }
0659             else
0660             {
0661                 NDRX_LOG(log_debug,"sent ok");
0662             }
0663         }
0664         
0665         DL_DELETE(list, el);
0666         NDRX_FREE((char *)el);
0667     }
0668     
0669 out:
0670 
0671     return ret;
0672 }
0673 
0674 /**
0675  * List messages in queue
0676  * @param p_ub
0677  * @param cd
0678  * @return 
0679  */
0680 expublic int tmq_mqlm(UBFH *p_ub, int cd)
0681 {
0682     int ret = EXSUCCEED;
0683     long revent;
0684     tmq_memmsg_t *el = NULL, *tmp = NULL, *list = NULL;
0685     short nodeid = G_tmqueue_cfg.vnodeid;
0686     short srvid = tpgetsrvid();
0687     char *fn = "tmq_mqlm";
0688     char qname[TMQNAMELEN+1];
0689     short is_locked;
0690     char msgid_str[TMMSGIDLEN_STR+1];
0691 
0692     /* Get list of queues */
0693     
0694     if (EXSUCCEED!=Bget(p_ub, EX_QNAME, 0, qname, 0L))
0695     {
0696         NDRX_LOG(log_error, "Failed to get qname");
0697         EXFAIL_OUT(ret);
0698     }
0699     
0700     if (NULL==(list = tmq_get_msglist(qname)))
0701     {
0702         NDRX_LOG(log_info, "%s: no messages in q", fn);
0703     }
0704     else
0705     {
0706         NDRX_LOG(log_info, "%s: messages found", fn);
0707     }
0708     
0709     DL_FOREACH_SAFE(list,el,tmp)
0710     {
0711         if (el->msg->lockthreadid)
0712         {
0713             is_locked = EXTRUE;
0714         }
0715         else
0716         {
0717             is_locked = EXFALSE;
0718         }
0719         
0720         tmq_msgid_serialize(el->msg->hdr.msgid, msgid_str);
0721         
0722         if (EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L) ||
0723             EXSUCCEED!=Bchg(p_ub, TMSRVID, 0, (char *)&srvid, 0L) ||
0724             EXSUCCEED!=Bchg(p_ub, EX_QMSGIDSTR, 0, msgid_str, 0L)  ||
0725             EXSUCCEED!=Bchg(p_ub, EX_TSTAMP1_STR, 0, 
0726                 ndrx_get_strtstamp2(0, el->msg->msgtstamp, el->msg->msgtstamp_usec), 0L) ||
0727             EXSUCCEED!=Bchg(p_ub, EX_TSTAMP2_STR, 0, 
0728                 ndrx_get_strtstamp2(1, el->msg->trytstamp, el->msg->trytstamp_usec), 0L) ||
0729             EXSUCCEED!=Bchg(p_ub, EX_QMSGTRIES, 0, (char *)&el->msg->trycounter, 0L) ||
0730             EXSUCCEED!=Bchg(p_ub, EX_QMSGLOCKED, 0, (char *)&is_locked, 0L)
0731                 )
0732         {
0733             NDRX_LOG(log_error, "failed to setup FB: %s", Bstrerror(Berror));
0734             EXFAIL_OUT(ret);
0735         }
0736         
0737         if (EXFAIL == tpsend(cd,
0738                             (char *)p_ub,
0739                             0L,
0740                             0,
0741                             &revent))
0742         {
0743             NDRX_LOG(log_error, "Send data failed [%s] %ld",
0744                                 tpstrerror(tperrno), revent);
0745             EXFAIL_OUT(ret);
0746         }
0747         else
0748         {
0749             NDRX_LOG(log_debug,"sent ok");
0750         }
0751         
0752         
0753     }
0754     
0755 out:
0756     /* delete the list if any */
0757     DL_FOREACH_SAFE(list,el,tmp)
0758     {
0759         DL_DELETE(list, el);
0760         NDRX_FREE((char *)el->msg);
0761         NDRX_FREE((char *)el);
0762     }
0763     return ret;
0764 }
0765 
0766 /**
0767  * Reload config
0768  * @param p_ub
0769  * @return 
0770  */
0771 expublic int tmq_mqrc(UBFH *p_ub)
0772 {
0773     int ret = EXSUCCEED;
0774     
0775     /* if have CC */
0776     if (ndrx_get_G_cconfig())
0777     {
0778         ndrx_cconfig_reload();
0779     }
0780     
0781     ret = tmq_reload_conf(G_tmqueue_cfg.qconfig);
0782     
0783 out:
0784     return ret;
0785 }
0786 
0787 /**
0788  * Change config
0789  * @param p_ub
0790  * @return 
0791  */
0792 expublic int tmq_mqch(UBFH *p_ub)
0793 {
0794     int ret = EXSUCCEED;
0795     char conf[512];
0796     BFLDLEN len = sizeof(conf);
0797     
0798     if (EXSUCCEED!=CBget(p_ub, EX_DATA, 0, conf, &len, BFLD_STRING))
0799     {
0800         NDRX_LOG(log_error, "Failed to get EX_DATA!");
0801         EXFAIL_OUT(ret);
0802     }
0803     
0804     ret = tmq_qconf_addupd(conf, NULL);
0805     
0806 out:
0807     return ret;
0808 }
0809 /* vim: set ts=4 sw=4 et smartindent: */