Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Memory based structures for Q.
0003  *  Due to fact that qdisk_xa in tmqmode may use callbacks from qspace
0004  *  Qspace is linked int qdisk_xa. Thought proper way would be to add callbacks
0005  *  
0006  *
0007  * @file qspace.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 
0038 #include <ndrx_config.h>
0039 
0040 #ifdef EX_OS_FREEBSD
0041 #define _WITH_GETLINE
0042 #endif
0043 
0044 #include <stdio.h>
0045 #include <stdlib.h>
0046 #include <string.h>
0047 #include <ctype.h>
0048 #include <errno.h>
0049 #include <regex.h>
0050 #include <utlist.h>
0051 
0052 #include <ndebug.h>
0053 #include <atmi.h>
0054 #include <atmi_int.h>
0055 #include <typed_buf.h>
0056 #include <ndrstandard.h>
0057 #include <ubf.h>
0058 #include <Exfields.h>
0059 
0060 #include <exnet.h>
0061 #include <ndrxdcmn.h>
0062 
0063 #include "tmqd.h"
0064 #include "../libatmisrv/srv_int.h"
0065 #include "tperror.h"
0066 #include "userlog.h"
0067 #include <xa_cmn.h>
0068 #include <exthpool.h>
0069 #include "nstdutil.h"
0070 #include "tmqueue.h"
0071 #include "cconfig.h"
0072 #include <rbtree.h>
0073 #include "qtran.h"
0074 /*---------------------------Externs------------------------------------*/
0075 /*---------------------------Macros-------------------------------------*/
0076 #define MAX_TOKEN_SIZE          64 /**< max key=value buffer size of qdef element */
0077 #define WORKERS_DEFAULT         1   /**< by default we have 1 workers for autoqs */
0078 
0079 #define TMQ_QC_NAME             "name"
0080 #define TMQ_QC_SVCNM            "svcnm"
0081 #define TMQ_QC_TRIES            "tries"
0082 #define TMQ_QC_AUTOQ            "autoq"
0083 #define TMQ_QC_WAITINIT         "waitinit"
0084 #define TMQ_QC_WAITRETRY        "waitretry"
0085 #define TMQ_QC_WAITRETRYINC     "waitretryinc"
0086 #define TMQ_QC_WAITRETRYMAX     "waitretrymax"
0087 #define TMQ_QC_MEMONLY          "memonly"
0088 #define TMQ_QC_MODE             "mode"
0089 #define TMQ_QC_TXTOUT           "txtout"    /**< transaction timeout override */
0090 #define TMQ_QC_ERRORQ           "errorq"    /**< Name of the error queue, opt */
0091 #define TMQ_QC_WORKERS          "workers"   /**< max number of workders       */
0092 #define TMQ_QC_SYNC             "sync"      /**< sync forward q               */
0093 
0094 #define EXHASH_FIND_STR_H2(head,findstr,out)                                     \
0095     EXHASH_FIND(h2,head,findstr,strlen(findstr),out)
0096 
0097 #define EXHASH_ADD_STR_H2(head,strfield,add)                                     \
0098     EXHASH_ADD(h2,head,strfield,strlen(add->strfield),add)
0099 
0100 #define EXHASH_DEL_H2(head,delptr)                                               \
0101     EXHASH_DELETE(h2,head,delptr)
0102 
0103 /**
0104  * Extract tmq_memmsg_t from the correltion tree node 
0105  */
0106 #define TMQ_COR_GETMSG(ptr) ((tmq_memmsg_t *)((char *)ptr - EXOFFSET(tmq_memmsg_t, cor)))
0107 
0108 
0109 /*---------------------------Enums--------------------------------------*/
0110 /*---------------------------Typedefs-----------------------------------*/
0111 /*---------------------------Globals------------------------------------*/
0112 
0113 /* Handler for MSG Hash. */
0114 expublic tmq_memmsg_t *G_msgid_hash = NULL;
0115 
0116 /* Handler for Q hash */
0117 expublic tmq_qhash_t *G_qhash = NULL;
0118 
0119 /*
0120  * Any public operations must be locked
0121  */
0122 exprivate MUTEX_LOCKDECL(M_q_lock);
0123 
0124 /* Configuration section */
0125 expublic tmq_qconfig_t *G_qconf = NULL; 
0126 /*---------------------------Statics------------------------------------*/
0127 /*---------------------------Prototypes---------------------------------*/
0128 exprivate tmq_memmsg_t* tmq_get_msg_by_msgid_str(char *msgid_str);
0129 
0130 /**
0131  * Check that message ID exists in memory store
0132  * @param msgid_str message id string
0133  * @return  EXTRUE/EXFALSE
0134  */
0135 expublic int tmq_msgid_exists(char *msgid_str)
0136 {
0137     tmq_memmsg_t *ret;
0138 
0139     MUTEX_LOCK_V(M_q_lock);   
0140     EXHASH_FIND_STR( G_msgid_hash, msgid_str, ret);
0141     MUTEX_UNLOCK_V(M_q_lock);
0142     
0143     return (NULL!=ret)?EXTRUE:EXFALSE;
0144 }
0145 
0146 
0147 /**
0148  * Process message blocks on disk read (after cold startup)
0149  * @param tmxid serialized trnasaction id
0150  * @param p_block
0151  * @param state TMQ_TXSTATE_ according to 
0152  * @param seqno command sequence number within the transaction
0153  * @return EXSUCCEED/EXFAIL
0154  */
0155 exprivate int process_block(char *tmxid, union tmq_block **p_block, int state, int seqno)
0156 {
0157     int ret = EXSUCCEED;
0158     
0159     
0160     /* in case if state is prepared or active -> add transaction with given
0161      * sequence
0162      */
0163     if (TMQ_TXSTATE_ACTIVE==state || TMQ_TXSTATE_PREPARED==state)
0164     {
0165         char entry_status;
0166         
0167         if (TMQ_TXSTATE_PREPARED==state)
0168         {
0169             entry_status = XA_RM_STATUS_PREP;
0170         }
0171         else
0172         {
0173             entry_status = XA_RM_STATUS_ACTIVE;
0174         }
0175         
0176         if (EXSUCCEED!=tmq_log_addcmd(tmxid, seqno, (char *)*p_block, entry_status))
0177         {
0178             NDRX_LOG(log_error, "Failed to add tmxid [%s] seqno %d",
0179                     tmxid, seqno);
0180             EXFAIL_OUT(ret);
0181         }
0182     }
0183     
0184     switch((*p_block)->hdr.command_code)
0185     {
0186         case TMQ_STORCMD_NEWMSG:
0187             
0188             if (EXSUCCEED!=tmq_msg_add((tmq_msg_t **)p_block, EXTRUE, NULL, NULL))
0189             {
0190                 NDRX_LOG(log_error, "Failed to enqueue!");
0191                 EXFAIL_OUT(ret);
0192             }
0193             
0194             break;
0195         case TMQ_STORCMD_DUM:
0196             
0197             break;
0198         default:
0199             
0200             if (EXSUCCEED!=tmq_lock_msg((*p_block)->hdr.msgid))
0201             {
0202                 if (TMQ_TXSTATE_PREPARED==state 
0203                         && TMQ_STORCMD_DEL==(*p_block)->hdr.command_code)
0204                 {
0205                     NDRX_LOG(log_info, "Delete command and message not found - assume deleted");
0206                 }
0207                 else
0208                 {
0209                     NDRX_LOG(log_error, "Failed to lock message!");
0210                     EXFAIL_OUT(ret);
0211                 }
0212             }
0213             
0214             break;
0215     }
0216     
0217 out:
0218     /* free the mem if needed: */
0219     if (NULL!=*p_block)
0220     {
0221         NDRX_FREE((char *)*p_block);
0222         *p_block = NULL;
0223     }
0224     return ret;
0225 }
0226 
0227 
0228 /**
0229  * Load the messages from QSPACE (after startup)...
0230  * This does not need a lock, because it uses other globals
0231  * @return EXSUCCEED/EXFAIL
0232  */
0233 expublic int tmq_load_msgs(void)
0234 {
0235     int ret = EXSUCCEED;
0236     
0237     NDRX_LOG(log_info, "Reading messages from disk...");
0238     /* populate all queues - from XA source */
0239     if (EXSUCCEED!=tmq_storage_get_blocks(process_block,  (short)G_tmqueue_cfg.vnodeid, 
0240             (short)tpgetsrvid()))
0241     {
0242         EXFAIL_OUT(ret);
0243     }
0244     
0245 out:
0246     NDRX_LOG(log_info, "tmq_load_msgs return %d", ret);
0247     return ret;
0248 }
0249 
0250 /**
0251  * Add dummy marker for given transaction, increment sequence number
0252  * @param tmxid transaction id (must be in the transaction regsitry)
0253  * @return EXSUCCEED/EXFAIL
0254  */
0255 expublic int tmq_dum_add(char *tmxid)
0256 {
0257     tmq_msg_dum_t dum;
0258     int ret = EXSUCCEED;
0259         
0260     /* Build up the message. 
0261      * Note that we might not be in transaction mode, in case if
0262      * doing prepare and we find that there is nothing to prepare.
0263      */
0264     tmq_setup_cmdheader_dum(&dum.hdr, NULL, G_tmqueue_cfg.vnodeid,
0265         tpgetsrvid(), ndrx_G_qspace, 0);
0266     dum.hdr.command_code = TMQ_STORCMD_DUM;
0267     
0268     /* this adds transaction to log: */
0269     if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&dum, 
0270                 "Dummy transaction marker", tmxid, NULL))
0271     {
0272         NDRX_LOG(log_error, "Failed to write dummy command block to disk [%s]", tmxid);
0273         EXFAIL_OUT(ret);
0274     }
0275 
0276 out:
0277     return ret;
0278 }
0279 
0280 /**
0281  * Setup dummy header
0282  * @param hdr header to setup
0283  * @param qname queue name
0284  */
0285 expublic int tmq_setup_cmdheader_dum(tmq_cmdheader_t *hdr, char *qname, 
0286         short nodeid, short srvid, char *qspace, long flags)
0287 {
0288     int ret = EXSUCCEED;
0289     
0290     NDRX_STRCPY_SAFE(hdr->qspace, qspace);
0291    /* strcpy(hdr->qname, qname); same object, causes core dumps on osx */
0292     hdr->command_code = TMQ_STORCMD_NEWMSG;
0293     NDRX_STRNCPY(hdr->magic, TMQ_MAGIC, TMQ_MAGIC_LEN);
0294     
0295     /* trailing magic */
0296     hdr->nodeid = nodeid;
0297     hdr->srvid = srvid;
0298     hdr->flags = flags;
0299     memset(hdr->reserved, 0, sizeof(hdr->reserved));
0300     memset(hdr->msgid, 0, sizeof(hdr->msgid));
0301     NDRX_STRNCPY(hdr->magic2, TMQ_MAGIC2, TMQ_MAGIC_LEN);
0302     
0303 out:
0304     return ret;
0305 }
0306 
0307 /**
0308  * Setup queue header
0309  * @param hdr header to setup
0310  * @param qname queue name
0311  */
0312 expublic int tmq_setup_cmdheader_newmsg(tmq_cmdheader_t *hdr, char *qname, 
0313         short nodeid, short srvid, char *qspace, long flags)
0314 {
0315     int ret = EXSUCCEED;
0316     
0317     NDRX_STRCPY_SAFE(hdr->qspace, qspace);
0318    /* strcpy(hdr->qname, qname); same object, causes core dumps on osx */
0319     hdr->command_code = TMQ_STORCMD_NEWMSG;
0320     NDRX_STRNCPY(hdr->magic, TMQ_MAGIC, TMQ_MAGIC_LEN);
0321     
0322     /* trailing magic */
0323     hdr->nodeid = nodeid;
0324     hdr->srvid = srvid;
0325     hdr->flags = flags;
0326     memset(hdr->reserved, 0, sizeof(hdr->reserved));
0327     NDRX_STRNCPY(hdr->magic2, TMQ_MAGIC2, TMQ_MAGIC_LEN);
0328     
0329     tmq_msgid_gen(hdr->msgid);
0330     
0331 out:
0332     return ret;
0333 }
0334 
0335 
0336 /**
0337  * Generate new transaction id, native form (byte array)
0338  * Note this initializes the msgid.
0339  * @param msgid value to return
0340  */
0341 expublic void tmq_msgid_gen(char *msgid)
0342 {
0343     exuuid_t uuid_val;
0344     short node_id = (short) G_tmqueue_cfg.vnodeid;
0345     short srv_id = (short) G_srv_id;
0346    
0347     memset(msgid, 0, TMMSGIDLEN);
0348     
0349     /* Do the locking, so that we get unique xids... */
0350     ndrx_cid_generate((unsigned char)tpgetnodeid(), uuid_val);
0351     
0352     memcpy(msgid, uuid_val, sizeof(exuuid_t));
0353     /* Have an additional infos for transaction id... */
0354     memcpy(msgid  
0355             +sizeof(exuuid_t)  
0356             ,(char *)&(node_id), sizeof(short));
0357     memcpy(msgid  
0358             +sizeof(exuuid_t) 
0359             +sizeof(short)
0360             ,(char *)&(srv_id), sizeof(short));    
0361     
0362     NDRX_LOG(log_debug, "MSGID: struct size: %d", sizeof(exuuid_t)+sizeof(short)+ sizeof(short));
0363 }
0364 
0365 
0366 /**
0367  * Load the key config parameter
0368  * @param qconf
0369  * @param key
0370  * @param value
0371  */
0372 exprivate int load_param(tmq_qconfig_t * qconf, char *key, char *value)
0373 {
0374     int ret = EXSUCCEED; 
0375     
0376     NDRX_LOG(log_info, "loading q param: [%s] = [%s]", key, value);
0377     
0378     /* - Not used.
0379     if (0==strcmp(key, TMQ_QC_NAME))
0380     {
0381         NDRX_STRNCPY(qconf->name, value, sizeof(qconf->name)-1);
0382         qconf->name[sizeof(qconf->name)-1] = EOS;
0383     }
0384     else  */
0385     if (0==strcmp(key, TMQ_QC_SVCNM))
0386     {
0387         NDRX_STRCPY_SAFE(qconf->svcnm, value);
0388     }
0389     else if (0==strcmp(key, TMQ_QC_ERRORQ))
0390     {
0391         NDRX_STRCPY_SAFE(qconf->errorq, value);
0392     }
0393     else if (0==strcmp(key, TMQ_QC_TRIES))
0394     {
0395         int ival = atoi(value);
0396         if (!ndrx_isint(value) || ival < 0)
0397         {
0398             NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)", 
0399                     value, key);
0400             EXFAIL_OUT(ret);
0401         }
0402         
0403         qconf->tries = ival;
0404     }
0405     else if (0==strcmp(key, TMQ_QC_AUTOQ))
0406     {
0407         char val = toupper(value[0]);
0408 
0409         if (NULL==strchr(TMQ_AUTOQ_ALLFLAGS, val))
0410         {
0411             NDRX_LOG(log_error, "Invalid value [%c] for key [%s] (allowed: [%s])", 
0412                     val, key, TMQ_AUTOQ_ALLFLAGS);
0413             EXFAIL_OUT(ret);
0414         }
0415         
0416         qconf->autoq = val;
0417         
0418     }
0419     else if (0==strcmp(key, TMQ_QC_WAITINIT))
0420     {
0421         int ival = atoi(value);
0422         if (!ndrx_isint(value) || ival < 0)
0423         {
0424             NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)", 
0425                     value, key);
0426             EXFAIL_OUT(ret);
0427         }
0428         
0429         qconf->waitinit = ival;
0430     }
0431     else if (0==strcmp(key, TMQ_QC_WAITRETRY))
0432     {
0433         int ival = atoi(value);
0434         if (!ndrx_isint(value) || ival < 0)
0435         {
0436             NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)", 
0437                     value, key);
0438             EXFAIL_OUT(ret);
0439         }
0440         
0441         qconf->waitretry = ival;
0442     }
0443     else if (0==strcmp(key, TMQ_QC_WAITRETRYINC))
0444     {
0445         NDRX_LOG(log_warn, "Ignoring [%s]", TMQ_QC_WAITRETRYINC);
0446     }
0447     else if (0==strcmp(key, TMQ_QC_WAITRETRYMAX))
0448     {
0449         int ival = atoi(value);
0450         if (!ndrx_isint(value) || ival < 0)
0451         {
0452             NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=0)", 
0453                     value, key);
0454             EXFAIL_OUT(ret);
0455         }
0456         
0457         qconf->waitretrymax = ival;
0458     }
0459     else if (0==strcmp(key, TMQ_QC_WORKERS))
0460     {
0461         int ival = atoi(value);
0462         if (!ndrx_isint(value) || ival < 1)
0463         {
0464             NDRX_LOG(log_error, "Invalid value [%s] for key [%s] (must be int>=1)", 
0465                     value, key);
0466             EXFAIL_OUT(ret);
0467         }
0468         
0469         qconf->workers = ival;
0470     }
0471     else if (0==strcmp(key, TMQ_QC_SYNC))
0472     {
0473         int ival = ndrx_args_confirm(value);
0474         
0475         if (EXFAIL==ival)
0476         {
0477             if (1==strlen(value) && NULL!=strstr(TMQ_ARGS_COMMIT, value))
0478             {
0479                 ival = TMQ_SYNC_TPCOMMIT;
0480             }
0481             else
0482             {
0483                 NDRX_LOG(log_error, "Invalid value [%s] for %s", value, TMQ_QC_SYNC);
0484                 EXFAIL_OUT(ret);   
0485             }
0486         }
0487         else if (ival)
0488         {
0489             ival = TMQ_SYNC_TPACALL;
0490         }
0491         else
0492         {
0493             ival=TMQ_SYNC_NONE;
0494         }
0495         
0496         qconf->sync = ival;
0497         
0498     }
0499     else if (0==strcmp(key, TMQ_QC_MEMONLY))
0500     {
0501         /* CURRENTLY NOT SUPPORTED.
0502         qconf->memonly = FALSE;
0503         if (value[0]=='y' || value[0]=='Y')
0504         {
0505             qconf->memonly = TRUE;
0506         }
0507         */
0508     }
0509     else if (0==strcmp(key, TMQ_QC_TXTOUT))
0510     {   
0511         /* transaction timeout */
0512         qconf->txtout = atoi(value);
0513     }
0514     else if (0==strcmp(key, TMQ_QC_MODE))
0515     {
0516         if (0==strcmp(value, "fifo") || 0==strcmp(value, "FIFO"))
0517         {
0518             qconf->mode = TMQ_MODE_FIFO;
0519         }
0520         else if (0==strcmp(value, "lifo") || 0==strcmp(value, "LIFO") )
0521         {
0522             qconf->mode = TMQ_MODE_LIFO;
0523         }
0524         else
0525         {
0526             NDRX_LOG(log_error, "Not supported Q mode (must be fifo or lifo)", 
0527                     value, key);
0528             EXFAIL_OUT(ret);
0529         }
0530     }
0531     else
0532     {
0533         NDRX_LOG(log_error, "Unknown Q config setting = [%s]", key);
0534         EXFAIL_OUT(ret);
0535     }
0536     
0537 out:
0538 
0539     return ret;
0540 }
0541 
0542 /**
0543  * Get Q config by name
0544  * @param name queue name
0545  * @return NULL or ptr to q config.
0546  */
0547 exprivate tmq_qconfig_t * tmq_qconf_get(char *qname)
0548 {
0549     tmq_qconfig_t *ret = NULL;
0550     
0551     EXHASH_FIND_STR( G_qconf, qname, ret);
0552     
0553     return ret;
0554 }
0555 
0556 /**
0557  * Return Q config with default if not found
0558  * TODO: Think about copy off the contents of qconf for substituing service 
0559  * to Q name.
0560  * @param qname qname
0561  * @param p_is_defaulted returns 1 if queue uses defaults Q
0562  * @return  NULL or ptr to config
0563  */
0564 expublic tmq_qconfig_t * tmq_qconf_get_with_default(char *qname, int *p_is_defaulted)
0565 {
0566     
0567     tmq_qconfig_t * ret = tmq_qconf_get(qname);
0568     
0569     if  (NULL==ret)
0570     {
0571         NDRX_LOG(log_info, "Q config [%s] not found, trying to default to [%s]", 
0572                 qname, TMQ_DEFAULT_Q);
0573         if (NULL==(ret = tmq_qconf_get(TMQ_DEFAULT_Q)))
0574         {
0575             NDRX_LOG(log_error, "Q [%s] is not defined and default config [%s] not found!",
0576                 qname, TMQ_DEFAULT_Q);
0577             userlog("Q [%s] is not defined and default config [%s] not found!",
0578                 qname, TMQ_DEFAULT_Q);
0579         }
0580         else if (NULL!=p_is_defaulted)
0581         {
0582             *p_is_defaulted = EXTRUE;
0583         }
0584     }
0585             
0586     return ret;
0587 }
0588 
0589 
0590 /**
0591  * Return string version of Q config
0592  * @param qname
0593  * @param p_is_defaulted
0594  * @param out_buf space where to print queue config
0595  * @param out_bufsz output buffer size
0596  * @return EXSUCCEED/EXFAIL
0597  */
0598 expublic int tmq_build_q_def(char *qname, int *p_is_defaulted, char *out_buf, size_t out_bufsz)
0599 {
0600     tmq_qconfig_t * qdef = NULL;
0601     int ret = EXSUCCEED;
0602     
0603     MUTEX_LOCK_V(M_q_lock);
0604 
0605     if (NULL==(qdef=tmq_qconf_get_with_default(qname, p_is_defaulted)))
0606     {
0607         EXFAIL_OUT(ret);
0608     }
0609     
0610     snprintf(out_buf, out_bufsz, "%s,svcnm=%s,autoq=%c,tries=%d,waitinit=%d,waitretry=%d,"
0611                         "waitretrymax=%d,mode=%s,txtout=%d,workers=%d",
0612             qdef->qname, 
0613             qdef->svcnm, 
0614             qdef->autoq,
0615             qdef->tries,
0616             qdef->waitinit,
0617             qdef->waitretry,
0618             qdef->waitretrymax,
0619             qdef->mode == TMQ_MODE_LIFO?"lifo":"fifo",
0620             qdef->txtout,
0621             qdef->workers);
0622 
0623     if (EXEOS!=qdef->errorq[0])
0624     {
0625         int len = strlen(out_buf);
0626         snprintf(out_buf+len, out_bufsz-len, ",errorq=%s", qdef->errorq);
0627     }
0628     
0629     if (qdef->sync)
0630     {
0631         int len = strlen(out_buf);
0632         char setting;
0633         
0634         if (TMQ_SYNC_TPACALL==qdef->sync)
0635         {
0636             setting='y';
0637         }
0638         else
0639         {
0640             setting='c';
0641         }
0642         
0643         snprintf(out_buf+len, out_bufsz-len, ",%s=%c", TMQ_QC_SYNC, setting);
0644     }
0645 
0646 out:
0647     MUTEX_UNLOCK_V(M_q_lock);
0648 
0649     return ret;
0650 }
0651 
0652 
0653 /**
0654  * Get the static copy of Q data for extract (non-locked) use.
0655  * @param qname
0656  * @param qconf_out where to store/copy the q def data
0657  * @return SUCCEED/FAIL
0658  */
0659 expublic int tmq_qconf_get_with_default_static(char *qname, tmq_qconfig_t *qconf_out)
0660 {
0661     int ret = EXSUCCEED;
0662     tmq_qconfig_t * tmp = NULL;
0663     
0664     MUTEX_LOCK_V(M_q_lock);
0665     
0666     tmp = tmq_qconf_get(qname);
0667 
0668     if  (NULL==tmp)
0669     {
0670         NDRX_LOG(log_warn, "Q config [%s] not found, trying to default to [%s]", 
0671                 qname, TMQ_DEFAULT_Q);
0672         if (NULL==(tmp = tmq_qconf_get(TMQ_DEFAULT_Q)))
0673         {
0674             NDRX_LOG(log_error, "Q [%s] is not defined and default config [%s] not found!",
0675                 qname, TMQ_DEFAULT_Q);
0676             userlog("Q [%s] is not defined and default config [%s] not found!",
0677                 qname, TMQ_DEFAULT_Q);
0678             EXFAIL_OUT(ret);
0679         }
0680         
0681     }
0682     
0683     memcpy(qconf_out, tmp, sizeof(*qconf_out));
0684         
0685 out:    
0686     MUTEX_UNLOCK_V(M_q_lock);
0687     
0688     return ret;
0689 }
0690 
0691 /**
0692  * Remove queue probably then existing messages will fall back to default Q
0693  * @param name
0694  * @return 
0695  */
0696 exprivate int tmq_qconf_delete(char *qname)
0697 {
0698     int ret = EXSUCCEED;
0699     tmq_qconfig_t *qconf;
0700     
0701     if (NULL!=(qconf=tmq_qconf_get(qname)))
0702     {
0703         EXHASH_DEL( G_qconf, qconf);
0704         NDRX_FREE(qconf);
0705     }
0706     else
0707     {
0708         NDRX_LOG(log_warn, "[%s] - queue not found", qname);
0709     }
0710     
0711 out:
0712     return ret;
0713 }
0714 
0715 /**
0716  * Reload the config of queues
0717  * @param cf
0718  * @return 
0719  */
0720 expublic int tmq_reload_conf(char *cf)
0721 {
0722     FILE *f = NULL;
0723 #ifdef HAVE_GETLINE
0724     char *line = NULL;
0725 #else
0726     char line[PATH_MAX];
0727 #endif
0728     size_t len = 0;
0729     int ret = EXSUCCEED;
0730     ssize_t read;
0731     ndrx_inicfg_section_keyval_t * csection = NULL, *val = NULL, *val_tmp = NULL;
0732     
0733     if (NULL!=ndrx_get_G_cconfig() && 
0734             EXSUCCEED==ndrx_cconfig_get(NDRX_CONF_SECTION_QUEUE, &csection))
0735     {
0736         EXHASH_ITER(hh, csection, val, val_tmp)
0737         {
0738             if (EXSUCCEED!=tmq_qconf_addupd(val->val, val->key))
0739             {
0740                 EXFAIL_OUT(ret);
0741             }
0742         }
0743     }
0744     else /* fallback to old config */
0745     {
0746         if (NULL==(f=NDRX_FOPEN(cf, "r")))
0747         {
0748             NDRX_LOG(log_error, "Failed to open [%s]:%s", cf, strerror(errno));
0749             EXFAIL_OUT(ret);
0750         }
0751 
0752 #ifdef HAVE_GETLINE
0753     /* does the line stays with longest buffer? if read longer and then shorter data line */
0754         while (EXFAIL!=(read = getline(&line, &len, f))) 
0755 #else
0756         len = sizeof(line);
0757         while (NULL!=fgets(line, len, f))
0758 #endif
0759         {
0760             ndrx_str_strip(line, " \n\r\t");
0761 
0762             /* Ignore comments & newlines */
0763             if ('#'==*line || EXEOS==*line)
0764             {
0765                 continue;
0766             }
0767 
0768             if (EXSUCCEED!=tmq_qconf_addupd(line, NULL))
0769             {
0770                 EXFAIL_OUT(ret);
0771             }
0772         }
0773     }
0774     
0775     
0776 out:
0777 
0778     /* 25/04/2018 moved down here, to avoid any memory leak if loop fails */
0779 #ifdef HAVE_GETLINE
0780     if (NULL!=line)
0781     {
0782         NDRX_FREE(line);
0783     }
0784 #endif
0785                 
0786     if (NULL!=csection)
0787     {
0788         ndrx_keyval_hash_free(csection);
0789     }
0790 
0791     if (NULL!=f)
0792     {
0793         NDRX_FCLOSE(f);
0794     }
0795 
0796     return ret;
0797 }
0798 
0799 /**
0800  * Add queue definition. Support also update
0801  * We shall support Q update too...
0802  * Syntax: -q VISA,svcnm=VISAIF,autoq=y|n,waitinit=30,waitretry=10,waitretrymax=40,memonly=y|n
0803  * @param qdefstr queue definition
0804  * @param name optional name (already parsed)
0805  * @return  SUCCEED/FAIL
0806  */
0807 expublic int tmq_qconf_addupd(char *qconfstr, char *name)
0808 {
0809     tmq_qconfig_t * qconf=NULL;
0810     tmq_qconfig_t * dflt=NULL;
0811     char * p;
0812     char * p2;
0813     int got_default = EXFALSE;
0814     int qupdate = EXFALSE;
0815     char buf[MAX_TOKEN_SIZE];
0816     int ret = EXSUCCEED;
0817     
0818     NDRX_LOG(log_info, "Add new Q: [%s]", qconfstr);
0819     
0820     MUTEX_LOCK_V(M_q_lock);
0821     
0822     if (NULL==name)
0823     {
0824         p = strtok (qconfstr,",");
0825     }
0826     else
0827     {
0828         p = name;
0829     }
0830     
0831     if (NULL!=p)
0832     {
0833         NDRX_LOG(log_info, "Got token: [%s]", p);
0834         NDRX_STRCPY_SAFE(buf, p);
0835         
0836         NDRX_LOG(log_debug, "Q name: [%s]", buf);
0837         
0838         if (NULL== (qconf = tmq_qconf_get(buf)))
0839         {
0840             NDRX_LOG(log_info, "Q not found, adding: [%s]", buf);
0841             qconf= NDRX_CALLOC(1, sizeof(tmq_qconfig_t));
0842                     
0843             /* Try to load initial config from @ (TMQ_DEFAULT_Q) Q */
0844             qconf->mode = TMQ_MODE_FIFO; /* default to FIFO... */
0845             qconf->txtout = EXFAIL;     /* use default */
0846             qconf->workers = WORKERS_DEFAULT;   /* set default workers to 2 */
0847             if (NULL!=(dflt=tmq_qconf_get(TMQ_DEFAULT_Q)))
0848             {
0849                 memcpy(qconf, dflt, sizeof(*dflt));
0850                 got_default = EXTRUE;
0851             }
0852             
0853             NDRX_STRCPY_SAFE(qconf->qname, buf);
0854         }
0855         else
0856         {
0857             NDRX_LOG(log_info, "Q found, updating: [%s]", buf);
0858             qupdate = EXTRUE;
0859         }
0860     }
0861     else
0862     {
0863         NDRX_LOG(log_error, "Missing Q name");
0864         EXFAIL_OUT(ret);
0865     }
0866     
0867     if (NULL==name)
0868     {
0869         p = strtok (NULL, ","); /* continue... */
0870     }
0871     else
0872     {
0873         p = strtok (qconfstr, ","); /* continue... */
0874     }
0875     
0876     while (p != NULL)
0877     {
0878         NDRX_LOG(log_info, "Got pair [%s]", p);
0879         
0880         NDRX_STRCPY_SAFE(buf, p);
0881         
0882         p2 = strchr(buf, '=');
0883         
0884         if (NULL == p2)
0885         {
0886             NDRX_LOG(log_error, "Invalid key=value token [%s] expected '='", buf);
0887             
0888             userlog("Error defining queue (%s) expected in '=' in token (%s)", 
0889                     qconfstr, buf);
0890             EXFAIL_OUT(ret);
0891         }
0892         *p2 = EXEOS;
0893         p2++;
0894         
0895         if (EXEOS==*p2)
0896         {
0897             NDRX_LOG(log_error, "Empty value for token [%s]", buf);
0898             userlog("Error defining queue (%s) invalid value for token (%s)", 
0899                     qconfstr, buf);
0900             EXFAIL_OUT(ret);
0901         }
0902         
0903         /*
0904          * Load the value into structure
0905          */
0906         if (EXSUCCEED!=load_param(qconf, buf, p2))
0907         {
0908             NDRX_LOG(log_error, "Failed to load token [%s]/[%s]", buf, p2);
0909             userlog("Error defining queue (%s) failed to load token [%s]/[%s]", 
0910                     buf, p2);
0911             EXFAIL_OUT(ret);
0912         }
0913         
0914         p = strtok (NULL, ",");
0915     }
0916     
0917     /* Validate the config... */
0918     
0919     if (0==strcmp(qconf->qname, TMQ_DEFAULT_Q) && got_default)
0920     {
0921         NDRX_LOG(log_error, "Missing [%s] param", TMQ_QC_NAME);
0922         /* TODO: Return some diagnostics... => EX_QDIAGNOSTIC invalid qname */
0923         EXFAIL_OUT(ret);
0924     }
0925     /* If autoq, then service must be set. */
0926 
0927     if (!qupdate)
0928     {
0929         EXHASH_ADD_STR( G_qconf, qname, qconf );
0930     }
0931     
0932 out:
0933 
0934     /* kill the record if invalid. */
0935     if (EXSUCCEED!=ret && NULL!=qconf && !qupdate)
0936     {
0937         NDRX_LOG(log_warn, "qconf -> free");
0938         NDRX_FREE(qconf);
0939     }
0940 
0941     MUTEX_UNLOCK_V(M_q_lock);
0942     return ret;
0943 
0944 }
0945 
0946 /**
0947  * Get QHASH record for particular q
0948  * @param qname
0949  * @return 
0950  */
0951 exprivate tmq_qhash_t * tmq_qhash_get(char *qname)
0952 {
0953     tmq_qhash_t * ret = NULL;
0954    
0955     EXHASH_FIND_STR( G_qhash, qname, ret);    
0956     
0957     return ret;
0958 }
0959 
0960 /**
0961  * Get new qhash entry + add it to hash.
0962  * @param qname
0963  * @return 
0964  */
0965 exprivate tmq_qhash_t * tmq_qhash_new(char *qname)
0966 {
0967     tmq_qhash_t * ret = NDRX_CALLOC(1, sizeof(tmq_qhash_t));
0968     
0969     if (NULL==ret)
0970     {
0971         NDRX_LOG(log_error, "Failed to alloc tmq_qhash_t: %s", strerror(errno));
0972         userlog("Failed to alloc tmq_qhash_t: %s", strerror(errno));
0973         goto out;
0974     }
0975     
0976     NDRX_STRCPY_SAFE(ret->qname, qname);
0977     
0978     EXHASH_ADD_STR( G_qhash, qname, ret );
0979 
0980     /* setup red-black trees */
0981     ndrx_rbt_init(&ret->q, tmq_rbt_cmp_cur, tmq_rbt_combine_cur, NULL, ret);
0982     ndrx_rbt_init(&ret->q_fut, tmq_rbt_cmp_fut, tmq_rbt_combine_fut, NULL, ret);
0983 
0984 out:
0985     return ret;
0986 }
0987 
0988 /**
0989  * Add message to queue
0990  * Think about TPQLOCKED so that other thread does not get message in progress..
0991  * In two phase commit mode, we need to unlock message only when it is enqueued on disk.
0992  * 
0993  * @param msg double ptr to message
0994  * @param is_recovery is recovery mode (no need to write to disk)
0995  * @param diag qctl for diag purposes.
0996  * @param int_diag internal diagnostics, flags
0997  * @return 
0998  */
0999 expublic int tmq_msg_add(tmq_msg_t **msg, int is_recovery, TPQCTL *diag, int *int_diag)
1000 {
1001     int ret = EXSUCCEED;
1002     int is_locked = EXFALSE;
1003     tmq_qhash_t *qhash;
1004     tmq_memmsg_t *mmsg = NDRX_CALLOC(1, sizeof(tmq_memmsg_t));
1005     tmq_qconfig_t * qconf;
1006     char msgid_str[TMMSGIDLEN_STR+1];
1007     char corrid_str[TMCORRIDLEN_STR+1];
1008     
1009     MUTEX_LOCK_V(M_q_lock);
1010     is_locked = EXTRUE;
1011     
1012     qhash = tmq_qhash_get((*msg)->hdr.qname);
1013     qconf = tmq_qconf_get_with_default((*msg)->hdr.qname, NULL);
1014     
1015     if (NULL==mmsg)
1016     {
1017         NDRX_LOG(log_error, "Failed to alloc tmq_memmsg_t: %s", strerror(errno));
1018         userlog("Failed to alloc tmq_memmsg_t: %s", strerror(errno));
1019         EXFAIL_OUT(ret);
1020     }
1021     
1022     if (NULL==qconf)
1023     {
1024         NDRX_LOG(log_error, "queue config not found! Cannot enqueue!");
1025         userlog("queue config not found! Cannot enqueue!");
1026         
1027         if (NULL!=diag)
1028         {
1029             diag->diagnostic = QMEBADQUEUE;
1030             snprintf(diag->diagmsg, sizeof(diag->diagmsg), "Queue [%s] not defined",
1031                     (*msg)->hdr.qname);
1032         }
1033         EXFAIL_OUT(ret);
1034     }
1035     
1036     mmsg->msg = *msg;
1037     
1038     /* Add the hash of IDs / check that msg isn't duplicate */
1039     tmq_msgid_serialize(mmsg->msg->hdr.msgid, msgid_str); 
1040     NDRX_LOG(log_info, "Adding to G_msgid_hash [%s]", msgid_str);
1041            
1042     if (NULL!=tmq_get_msg_by_msgid_str(msgid_str))
1043     {
1044         NDRX_LOG(log_error, "Message with msgid [%s] already exists!", msgid_str);
1045         userlog("Message with msgid [%s] already exists!", msgid_str);
1046         EXFAIL_OUT(ret);
1047     }
1048     
1049     /* Get the entry for hash of queues: */
1050     if (NULL==qhash && NULL==(qhash=tmq_qhash_new((*msg)->hdr.qname)))
1051     {
1052         NDRX_LOG(log_error, "Failed to get/create qhash entry for Q [%s]", 
1053                 (*msg)->hdr.qname);
1054         EXFAIL_OUT(ret);
1055     }
1056 
1057     NDRX_STRCPY_SAFE(mmsg->msgid_str, msgid_str);
1058 
1059     if (mmsg->msg->qctl.flags & TPQCORRID)
1060     {
1061         tmq_msgid_serialize((*msg)->qctl.corrid, corrid_str);
1062         NDRX_STRCPY_SAFE(mmsg->corrid_str, corrid_str);
1063         NDRX_LOG(log_debug, "Adding to corrid_hash [%s] of queue [%s]",
1064             corrid_str, (*msg)->hdr.qname);
1065     }
1066 
1067     /* add message to correspoding Q */
1068     if (EXSUCCEED!=(ret=ndrx_infl_addmsg(qconf, qhash, mmsg)))
1069     {
1070         NDRX_LOG(log_error, "ndrx_infl_addmsg failed with %d", ret);
1071         goto out;
1072     }
1073     /* have to unlock here, because tmq_storage_write_cmd_newmsg() migth callback to
1074      * us and that might cause stall.
1075      */
1076     MUTEX_UNLOCK_V(M_q_lock);
1077     is_locked = EXFALSE;
1078     
1079     /* Decide do we need to add the msg to disk?! 
1080      * Needs to send a command to XA sub-system to prepare msg/command to disk,
1081      * if it is not memory only.
1082      * So next step todo is to write xa command handler & dumping commands to disk.
1083      */
1084     if (!qconf->memonly)
1085     {
1086         /* for recovery no need to put command as we read from command file */
1087         if (!is_recovery)
1088         {
1089             if (EXSUCCEED!=tmq_storage_write_cmd_newmsg(mmsg->msg, int_diag))
1090             {
1091                 NDRX_LOG(log_error, "Failed to add message to persistent store!");
1092                 
1093                 /* set the OS error */
1094                 if (NULL!=diag)
1095                 {
1096                     diag->diagnostic = QMEOS;
1097                     snprintf(diag->diagmsg, sizeof(diag->diagmsg), "Failed to persist msg");
1098                 }
1099                 
1100                 EXFAIL_OUT(ret);
1101             }
1102         }
1103     }
1104     else
1105     {
1106         NDRX_LOG(log_info, "Mem only Q, not persisting.");   
1107     }
1108     
1109     /* Add only if all OK 
1110      * Note locked here...
1111      */
1112     MUTEX_LOCK_V(M_q_lock);
1113     qhash->numenq++;
1114     MUTEX_UNLOCK_V(M_q_lock);
1115     
1116     NDRX_LOG(log_info, "Message with id [%s] successfully enqueued to [%s] "
1117             "queue (DEBUG: locked %ld)",
1118             tmq_msgid_serialize((*msg)->hdr.msgid, msgid_str), (*msg)->hdr.qname, 
1119             mmsg->msg->lockthreadid);
1120     
1121     /* message is in use */
1122     *msg = NULL;
1123     
1124 out:
1125 
1126     if (is_locked)
1127     {
1128         MUTEX_UNLOCK_V(M_q_lock);
1129         is_locked=EXFALSE;   
1130     }
1131 
1132     /* NOT SURE IS THIS GOOD as this might cause segmentation fault.
1133      * as added to mem, but failed to write to disk.
1134      * Message will be kept locked, thus we can free it.
1135      * Also as disk is failed, we will never get unlock by tmsrv as file
1136      * is not available. Thus it is safe here.
1137      */
1138     if (EXSUCCEED!=ret && mmsg!=NULL)
1139     {
1140         /* remove messages hashes, due to failure */
1141         MUTEX_LOCK_V(M_q_lock);
1142 
1143         /* remove from infliht structures */
1144         ndrx_infl_delmsg(mmsg);
1145 
1146         MUTEX_UNLOCK_V(M_q_lock);
1147 
1148         NDRX_FREE(mmsg);
1149         mmsg=NULL;
1150     }
1151 
1152     /* free up message, if it is not used anymore. */
1153     if (NULL==mmsg)
1154     {
1155         NDRX_FREE((*msg));
1156         *msg = NULL;
1157     }
1158 
1159     return ret;
1160 }
1161 
1162 /**
1163  * Get the fifo message from Q
1164  * @param qname queue to lookup.
1165  * @param diagnostic specific queue error code
1166  * @param corrid_str dequeue by correlator (if not NULL)
1167  * @param int_diag internal diagnostics, flags
1168  * @return NULL (no msg), or ptr to msg
1169  */
1170 expublic tmq_msg_t * tmq_msg_dequeue(char *qname, long flags, int is_auto, long *diagnostic, 
1171         char *diagmsg, size_t diagmsgsz, char *corrid_str, int *int_diag)
1172 {
1173     tmq_qhash_t *qhash;
1174     tmq_corhash_t *corhash;
1175     tmq_memmsg_t *node = NULL;
1176     tmq_msg_t * ret = NULL;
1177     tmq_msg_del_t block;
1178     char msgid_str[TMMSGIDLEN_STR+1];
1179     tmq_qconfig_t *qconf;
1180     int is_locked=EXFALSE;
1181     
1182     *diagnostic=EXSUCCEED;
1183     
1184     NDRX_LOG(log_debug, "FIFO/LIFO dequeue for [%s]", qname);
1185     MUTEX_LOCK_V(M_q_lock);
1186     is_locked=EXTRUE;
1187     
1188     /* Find the non locked message in memory */
1189     
1190     /* Lock the message for current thread. 
1191      * The thread will later issue xA command either:
1192      * - Increase counter
1193      * - Remove the message.
1194      */
1195     
1196     if (NULL==(qconf=tmq_qconf_get_with_default(qname, NULL)))
1197     {
1198         
1199         NDRX_LOG(log_error, "Failed to get q config [%s]", 
1200                 qname);
1201         
1202         *diagnostic=QMEBADQUEUE;
1203         snprintf(diagmsg, diagmsgsz, "Queue [%s] not defined", qname);
1204         
1205         goto out;
1206     }
1207     
1208     /* if not such queue, the no msg */
1209     if (NULL==(qhash = tmq_qhash_get(qname)))
1210     {
1211         NDRX_LOG(log_warn, "Q [%s] is NULL/empty", qname);
1212         *diagnostic=QMENOMSG;
1213         snprintf(diagmsg, diagmsgsz, "Q [%s] is NULL/empty", qname);
1214         goto out;
1215     }
1216 
1217     NDRX_LOG(log_debug, "mode corrid_str[%s]: %s", corrid_str?corrid_str:"N/A",
1218                TMQ_MODE_LIFO == qconf->mode?"LIFO":"FIFO");
1219     
1220     /* if no hash available -> assume no msg*/
1221     if (NULL!=corrid_str && NULL==(corhash = tmq_cor_find(qhash, corrid_str)))
1222     {
1223         NDRX_LOG(log_info, "Q [%s] corrid_str [%s] not defined", 
1224             qname, corrid_str);
1225         snprintf(diagmsg, diagmsgsz, "Q [%s] corrid_str [%s] not defined", 
1226             qname, corrid_str);
1227         *diagnostic=QMENOMSG;
1228         goto out;
1229     }
1230 
1231     /* check q_fut for deq_time and move from future 2 cur|cor */
1232     ndrx_infl_fut2cur(qhash);
1233 
1234     if (TMQ_MODE_LIFO == qconf->mode)
1235     {
1236         /* LIFO mode */
1237         if (NULL!=corrid_str)
1238         {
1239             /* get latest corhash msg from RBT */
1240             node = TMQ_COR_GETMSG(ndrx_rbt_rightmost(&corhash->corq));
1241         }
1242         else
1243         {
1244            node = (tmq_memmsg_t*)ndrx_rbt_rightmost(&qhash->q);
1245         }
1246     }
1247     else
1248     {
1249         /* FIFO */
1250         if (NULL!=corrid_str)
1251         {
1252             /* get first corhash msg from RBT */
1253             node = TMQ_COR_GETMSG(ndrx_rbt_leftmost(&corhash->corq));
1254         }
1255         else
1256         {
1257            node = (tmq_memmsg_t*)ndrx_rbt_leftmost(&qhash->q);
1258         }
1259     }
1260 
1261     if (NULL!=node)
1262     {
1263         ret=node->msg;
1264     }
1265 
1266     if (NULL==ret)
1267     {
1268         NDRX_LOG(log_debug, "Q [%s] is empty or all msgs locked", qname);
1269         goto out;
1270     }
1271     
1272     /* Write some stuff to log */
1273     
1274     tmq_msgid_serialize(ret->hdr.msgid, msgid_str);
1275     NDRX_LOG(log_info, "Dequeued message: [%s]", msgid_str);
1276     NDRX_DUMP(log_debug, "Dequeued message", ret->msg, ret->len);
1277     
1278     /* Lock the message */
1279     ret->lockthreadid = ndrx_gettid();
1280     if (EXSUCCEED!=ndrx_infl_mov2infl(node))
1281     {
1282         NDRX_LOG(log_error, "Failed to move msg to inflight!");
1283         ret = NULL;
1284         *diagnostic=QMEOS;
1285         NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1286         goto out;
1287     }
1288 
1289     MUTEX_UNLOCK_V(M_q_lock);
1290     is_locked=EXFALSE;
1291     
1292     /* Is it must not be a peek and must not be an autoq */
1293     if (!(flags & TPQPEEK) && !is_auto)
1294     {   
1295         /* Issue command for msg remove */   
1296         memcpy(&block.hdr, &ret->hdr, sizeof(ret->hdr));
1297         block.hdr.command_code = TMQ_STORCMD_DEL;
1298 
1299         if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&block, 
1300                 "Removing dequeued message", NULL, int_diag))
1301         {
1302             NDRX_LOG(log_error, "Failed to remove msg...");
1303             /* unlock msg... */
1304             MUTEX_LOCK_V(M_q_lock);
1305             
1306             ret->lockthreadid = 0;
1307             /* move to cur/cor/fut */
1308             ndrx_infl_mov2cur(node);
1309 
1310             MUTEX_UNLOCK_V(M_q_lock);
1311             
1312             ret = NULL;
1313             *diagnostic=QMEOS;
1314             NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1315             goto out;
1316         }
1317     }
1318     
1319 out:
1320         
1321     if (is_locked)
1322     {
1323         MUTEX_UNLOCK_V(M_q_lock);
1324     }
1325 
1326     /* set default error code */
1327     if (NULL==ret && EXSUCCEED==*diagnostic)
1328     {
1329         NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: no message in Q!", diagmsgsz);
1330         *diagnostic=QMENOMSG;
1331     }
1332 
1333     return ret;
1334 }
1335 
1336 /**
1337  * Dequeue message by msgid
1338  * @param msgid
1339  * @param diagnostic queue error code, if any
1340  * @param int_diag internal diagnostics, flags
1341  * @return 
1342  */
1343 expublic tmq_msg_t * tmq_msg_dequeue_by_msgid(char *msgid, long flags, long *diagnostic, 
1344         char *diagmsg, size_t diagmsgsz, int *int_diag)
1345 {
1346     tmq_msg_t * ret = NULL;
1347     tmq_msg_del_t del;
1348     char msgid_str[TMMSGIDLEN_STR+1];
1349     tmq_memmsg_t *mmsg;
1350     int is_locked=EXFALSE;
1351     
1352     *diagnostic=EXSUCCEED;
1353     
1354     MUTEX_LOCK_V(M_q_lock);
1355     is_locked=EXTRUE;
1356     
1357     /* Write some stuff to log */
1358     
1359     tmq_msgid_serialize(msgid, msgid_str);
1360     NDRX_LOG(log_info, "MSGID: Dequeuing message by [%s]", msgid_str);
1361     
1362     if (NULL==(mmsg = tmq_get_msg_by_msgid_str(msgid_str)))
1363     {
1364         NDRX_LOG(log_error, "Message not found by msgid_str [%s]", msgid_str);
1365         goto out;
1366     }
1367 
1368     ret = mmsg->msg;
1369 
1370     NDRX_DUMP(log_debug, "Dequeued message", ret->msg, ret->len);
1371 
1372     /* Message is locked, return that message is not found */
1373     if (0!=ret->lockthreadid)
1374     {
1375         NDRX_LOG(log_error, "Message is busy (locked by thread [%llu])", ret->lockthreadid);
1376         ret = NULL;
1377         goto out;
1378     }
1379 
1380     /* Lock the message */
1381     ret->lockthreadid = ndrx_gettid();
1382 
1383 /* todo required parameter is qhash */
1384     ndrx_infl_mov2infl(mmsg);
1385 
1386     /* release the lock.. */
1387     MUTEX_UNLOCK_V(M_q_lock);
1388     is_locked=EXFALSE;
1389     
1390     /* Issue command for msg remove */
1391     memcpy(&del.hdr, &ret->hdr, sizeof(ret->hdr));
1392     
1393     del.hdr.command_code = TMQ_STORCMD_DEL;
1394     
1395     if (!(flags & TPQPEEK))
1396     {
1397         if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&del, 
1398                 "Removing dequeued message", NULL, int_diag))
1399         {
1400             NDRX_LOG(log_error, "Failed to remove msg...");
1401             /* unlock msg... */
1402             MUTEX_LOCK_V(M_q_lock);
1403             ret->lockthreadid = 0;
1404         ndrx_infl_mov2cur(mmsg);
1405             MUTEX_UNLOCK_V(M_q_lock);
1406             ret = NULL;
1407             *diagnostic=QMEOS;
1408             NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: disk write error!", diagmsgsz);
1409             goto out;
1410         }
1411     }
1412     
1413 out:
1414     if (is_locked)
1415     {
1416         MUTEX_UNLOCK_V(M_q_lock);
1417     }
1418 
1419     /* set default error code */
1420     if (NULL==ret && EXSUCCEED==*diagnostic)
1421     {
1422         NDRX_STRCPY_SAFE_DST(diagmsg, "tmq_dequeue: no message in Q!", diagmsgsz);
1423         *diagnostic=QMENOMSG;
1424     }
1425 
1426     return ret;
1427 }
1428 
1429 /**
1430  * Get message by msgid
1431  * @param msgid
1432  * @return 
1433  */
1434 exprivate tmq_memmsg_t* tmq_get_msg_by_msgid_str(char *msgid_str)
1435 {
1436     tmq_memmsg_t *ret;
1437     
1438     EXHASH_FIND_STR( G_msgid_hash, msgid_str, ret);
1439     
1440     return ret;
1441 }
1442 
1443 /**
1444  * Remove mem message
1445  * 
1446  * @param msg
1447  */
1448 exprivate void tmq_remove_msg(tmq_memmsg_t *mmsg)
1449 {
1450     char msgid_str[TMMSGIDLEN_STR+1];   
1451     tmq_msgid_serialize(mmsg->msg->hdr.msgid, msgid_str);
1452     
1453     tmq_qhash_t *qhash = tmq_qhash_get(mmsg->msg->hdr.qname);
1454     
1455     NDRX_LOG(log_info, "Removing msgid [%s] from [%s] q", msgid_str, mmsg->msg->hdr.qname);
1456     
1457     if (NULL!=qhash)
1458     {
1459         qhash->numdeq++;
1460     }
1461 
1462     ndrx_infl_delmsg(mmsg);
1463     
1464     NDRX_FREE(mmsg->msg);
1465     NDRX_FREE(mmsg);
1466 }
1467 
1468 /**
1469  * Unlock message by updated block
1470  * We can:
1471  * - update content + unlock
1472  * - or remove the message
1473  * @param p_b
1474  * @return 
1475  */
1476 expublic int tmq_unlock_msg(union tmq_upd_block *b)
1477 {
1478     int ret = EXSUCCEED;
1479     char msgid_str[TMMSGIDLEN_STR+1];
1480     tmq_memmsg_t* mmsg;
1481     
1482     tmq_msgid_serialize(b->hdr.msgid, msgid_str);
1483     
1484     NDRX_LOG(log_info, "Unlocking/updating: %s", msgid_str);
1485     
1486     MUTEX_LOCK_V(M_q_lock);
1487     
1488     /* no msg to process for dummy */
1489     if (TMQ_STORCMD_DUM!=b->hdr.command_code)
1490     {
1491         mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1492 
1493         if (NULL==mmsg)
1494         {   
1495             NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1496 
1497             /* might be a case when message file was deleted, but command block
1498              * not. Thus for command block might be false re-attempt.
1499              */
1500             goto out;
1501         }
1502     }
1503     
1504     switch (b->hdr.command_code)
1505     {
1506         case TMQ_STORCMD_DEL:
1507             NDRX_LOG(log_info, "Removing message...");
1508             tmq_remove_msg(mmsg);
1509             mmsg = NULL;
1510             break;
1511         case TMQ_STORCMD_UPD:
1512             UPD_MSG((mmsg->msg), (&b->upd));
1513         /* And still we want unblock: */
1514         case TMQ_STORCMD_NEWMSG:
1515         case TMQ_STORCMD_UNLOCK:
1516             NDRX_LOG(log_info, "Unlocking message...");
1517             mmsg->msg->lockthreadid = 0;
1518             /* todo requred parameter qconf, qhash */
1519             ndrx_infl_mov2cur(mmsg);
1520             /* wakeup the Q... runner */
1521             ndrx_forward_chkrun(mmsg);
1522             
1523             break;
1524         case TMQ_STORCMD_DUM:
1525             /* nothing todo; */
1526             break;
1527         default:
1528             NDRX_LOG(log_info, "Unknown command [%c]", b->hdr.command_code);
1529             EXFAIL_OUT(ret);
1530             break; 
1531     }
1532     
1533 out:
1534     MUTEX_UNLOCK_V(M_q_lock);
1535     return ret;
1536 }
1537 
1538 /**
1539  * Unlock memory message by msgid (used for PEEK)
1540  * TODO: add chkrun in case if doing unlock for PEEK.
1541  * @param msgid
1542  * @return 
1543  */
1544 expublic int tmq_unlock_msg_by_msgid(char *msgid, int chkrun)
1545 {
1546     int ret = EXSUCCEED;
1547     char msgid_str[TMMSGIDLEN_STR+1];
1548     tmq_memmsg_t* mmsg;
1549     
1550     tmq_msgid_serialize(msgid, msgid_str);
1551     
1552     NDRX_LOG(log_info, "Unlocking/updating: %s", msgid_str);
1553     
1554     MUTEX_LOCK_V(M_q_lock);
1555     
1556     mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1557     
1558     if (NULL==mmsg)
1559     {   
1560         NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1561         EXFAIL_OUT(ret);
1562     }
1563     
1564     mmsg->msg->lockthreadid = 0;
1565     ndrx_infl_mov2cur(mmsg);
1566 
1567     if (chkrun)
1568     {
1569         ndrx_forward_chkrun(mmsg);
1570     }
1571     
1572 out:
1573     MUTEX_UNLOCK_V(M_q_lock);
1574     return ret;
1575 }
1576 
1577 
1578 /**
1579  * Load the messages
1580  * @param msgid
1581  * @return 
1582  */
1583 expublic int tmq_lock_msg(char *msgid)
1584 {
1585     int ret = EXSUCCEED;
1586     char msgid_str[TMMSGIDLEN_STR+1];
1587     tmq_memmsg_t* mmsg;
1588     
1589     tmq_msgid_serialize(msgid, msgid_str);
1590     
1591     NDRX_LOG(log_info, "Locking: %s", msgid_str);
1592     
1593     MUTEX_LOCK_V(M_q_lock);
1594     
1595     mmsg = tmq_get_msg_by_msgid_str(msgid_str);
1596     
1597     if (NULL==mmsg)
1598     {   
1599         NDRX_LOG(log_error, "Message not found: [%s] - no update", msgid_str);
1600         EXFAIL_OUT(ret);
1601     }
1602     
1603     /* Lock the message */
1604     mmsg->msg->lockthreadid = ndrx_gettid();
1605     ndrx_infl_mov2infl(mmsg);
1606 
1607 out:
1608     MUTEX_UNLOCK_V(M_q_lock);
1609     return ret;
1610 }
1611 
1612 #if 0
1613 /**
1614  * compare two Q entries, by time + counter
1615  * @param q1
1616  * @param q2
1617  * @return 
1618  */
1619 expublic int q_msg_sort(tmq_memmsg_t *q1, tmq_memmsg_t *q2)
1620 {
1621     
1622     return ndrx_compare3(q1->msg->msgtstamp, q1->msg->msgtstamp_usec, q1->msg->msgtstamp_cntr, 
1623             q2->msg->msgtstamp, q2->msg->msgtstamp_usec, q2->msg->msgtstamp_cntr);
1624     
1625 }
1626 #endif
1627 /**
1628  * Return list of auto queues
1629  * @return NULL or list
1630  */
1631 expublic fwd_qlist_t *tmq_get_qlist(int auto_only, int incl_def)
1632 {
1633     fwd_qlist_t * ret = NULL;
1634     fwd_qlist_t * tmp = NULL;
1635     
1636     tmq_qhash_t *q, *qtmp;
1637     tmq_qconfig_t *qconf;
1638     
1639     tmq_qconfig_t *qc, *qctmp;
1640     
1641     MUTEX_LOCK_V(M_q_lock);
1642     
1643     EXHASH_ITER(hh, G_qhash, q, qtmp)
1644     {
1645         if (NULL!=(qconf=tmq_qconf_get_with_default(q->qname, NULL)) && 
1646                 ((auto_only && TMQ_AUTOQ_ISAUTO(qconf->autoq)) || !auto_only))
1647         {
1648             if (NULL==(tmp = NDRX_CALLOC(1, sizeof(fwd_qlist_t))))
1649             {
1650                 int err = errno;
1651                 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1652                 userlog("Failed to alloc: %s", strerror(err));
1653                 ret = NULL;
1654                 goto out;
1655             }
1656             /* have some stats */
1657             NDRX_STRCPY_SAFE(tmp->qname, q->qname);
1658             tmp->succ = q->succ;
1659             tmp->fail = q->fail;
1660             
1661             tmp->numenq = q->numenq;
1662             tmp->numdeq = q->numdeq;
1663             tmp->workers = qconf->workers;
1664             tmp->sync = qconf->sync;
1665             
1666             DL_APPEND(ret, tmp);
1667         }
1668         
1669     }
1670     
1671     /* If we need to include definitions
1672      * Then iterate over qdefs and change which are not in the G_qhash
1673      * Those add to return DL
1674      */
1675     if (incl_def)
1676     {
1677         EXHASH_ITER(hh, G_qconf, qc, qctmp)
1678         {
1679             if (NULL==tmq_qhash_get(qc->qname))
1680             {
1681                 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(fwd_qlist_t))))
1682                 {
1683                     int err = errno;
1684                     NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1685                     userlog("Failed to alloc: %s", strerror(err));
1686                     ret = NULL;
1687                     goto out;
1688                 }
1689                 NDRX_LOG(log_debug, "tmq_get_qlist: %s", qc->qname);
1690                 NDRX_STRCPY_SAFE(tmp->qname, qc->qname);
1691                 DL_APPEND(ret, tmp);
1692             }
1693         }
1694     }
1695     
1696 out:
1697     MUTEX_UNLOCK_V(M_q_lock);
1698     return ret;
1699 }
1700 
1701 /**
1702  * Return list of messages in the q
1703  * @return NULL or list
1704  */
1705 expublic tmq_memmsg_t *tmq_get_msglist(char *qname)
1706 {
1707     tmq_qhash_t *qhash;
1708     tmq_memmsg_t *node;
1709     tmq_memmsg_t * ret = NULL;
1710     tmq_memmsg_t * tmp = NULL;
1711     tmq_msg_t * msg = NULL;
1712     ndrx_rbt_tree_iterator_t iter;
1713     ndrx_rbt_tree_t *rbt_trees[2];
1714     int i;
1715     
1716     NDRX_LOG(log_debug, "tmq_get_msglist listing for [%s]", qname);
1717     MUTEX_LOCK_V(M_q_lock);
1718     
1719     if (NULL==(qhash = tmq_qhash_get(qname)))
1720     {
1721         NDRX_LOG(log_warn, "Q [%s] is NULL/empty", qname);
1722         goto out;
1723     }
1724     
1725     /* Start from first one & loop over the list while 
1726      * - we get to the first non-locked message
1727      * - or we get to the end with no msg, then return FAIL.
1728      */
1729     node = qhash->q_infligh;
1730     
1731     do
1732     {
1733         if (NULL!=node)
1734         {
1735             if (NULL==(tmp = NDRX_CALLOC(1, sizeof(tmq_memmsg_t))))
1736             {
1737                 int err = errno;
1738                 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1739                 userlog("Failed to alloc: %s", strerror(err));
1740                 ret = NULL;
1741                 goto out;
1742             }
1743             
1744             if (NULL==(msg = NDRX_MALLOC(sizeof(tmq_msg_t))))
1745             {
1746                 int err = errno;
1747                 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1748                 userlog("Failed to alloc: %s", strerror(err));
1749                 ret = NULL;
1750                 goto out;
1751             }
1752             
1753             memcpy(msg, node->msg, sizeof(tmq_msg_t));
1754             tmp->msg = msg;
1755             
1756             DL_APPEND(ret, tmp);
1757             
1758             /* default to FIFO */
1759             node = node->next;
1760         }
1761     }
1762     while (NULL!=node && node!=qhash->q_infligh);
1763 
1764     /* List all messages from qhash->cur and qhash-> fut */
1765     rbt_trees[0] = &qhash->q;
1766     rbt_trees[1] = &qhash->q_fut;
1767     for (i=0; i<2; i++)
1768     {
1769         ndrx_rbt_begin_iterate(rbt_trees[i], LeftRightWalk, &iter);
1770         while (NULL!=(node=(tmq_memmsg_t *)ndrx_rbt_iterate(&iter)))
1771         {
1772             if (NULL==(tmp = NDRX_CALLOC(1, sizeof(tmq_memmsg_t))))
1773             {
1774                 int err = errno;
1775                 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1776                 userlog("Failed to alloc: %s", strerror(err));
1777                 ret = NULL;
1778                 goto out;
1779             }
1780 
1781             if (NULL==(msg = NDRX_MALLOC(sizeof(tmq_msg_t))))
1782             {
1783                 int err = errno;
1784                 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
1785                 userlog("Failed to alloc: %s", strerror(err));
1786                 ret = NULL;
1787                 goto out;
1788             }
1789 
1790             memcpy(msg, node->msg, sizeof(tmq_msg_t));
1791             tmp->msg = msg;
1792 
1793             DL_APPEND(ret, tmp);
1794 
1795         }
1796     }
1797 
1798 out:
1799     MUTEX_UNLOCK_V(M_q_lock);
1800     return ret;
1801 }
1802 
1803 /**
1804  * Update queue statistics
1805  * @param qname
1806  * @param msgs_diff
1807  * @param succ_diff
1808  * @param fail_diff
1809  * @return 
1810  */
1811 expublic int tmq_update_q_stats(char *qname, long succ_diff, long fail_diff)
1812 {
1813     tmq_qhash_t  *q;
1814     MUTEX_LOCK_V(M_q_lock);
1815     
1816     if (NULL!=(q = tmq_qhash_get(qname)))
1817     {
1818         q->succ += succ_diff;
1819         q->fail += fail_diff;
1820     }
1821     
1822 out:
1823             
1824     MUTEX_UNLOCK_V(M_q_lock);
1825 
1826     return EXSUCCEED;
1827 }
1828 
1829 /**
1830  * Return infos about enqueued messages.
1831  * @param qname
1832  * @param p_msgs
1833  * @param p_locked
1834  * @return 
1835  */
1836 expublic void tmq_get_q_stats(char *qname, long *p_msgs, long *p_locked)
1837 {
1838     tmq_qhash_t  *qhash;        
1839     tmq_memmsg_t *node;
1840     ndrx_rbt_tree_iterator_t iter;
1841     ndrx_rbt_tree_t *rbt_trees[2];
1842 
1843     MUTEX_LOCK_V(M_q_lock);
1844 
1845     if (NULL!=(qhash = tmq_qhash_get(qname)))
1846     {
1847         node = qhash->q_infligh;
1848         /* Count all messages in qhash->q_infligh */
1849         do
1850         {
1851             if (NULL!=node)
1852             {
1853                 *p_msgs = *p_msgs +1 ;
1854                 if (node->msg->lockthreadid)
1855                 {
1856                     *p_locked = *p_locked +1 ;
1857                 }
1858                 /* default to FIFO */
1859                 node = node->next;
1860             }
1861 
1862         }
1863         while (NULL!=node && node!=qhash->q_infligh);
1864 
1865         /* Count all messages in qhash->cur and qhash-> fut */
1866         rbt_trees[0] = &qhash->q;
1867         rbt_trees[1] = &qhash->q_fut;
1868         for (int i=0; i<2; i++)
1869         {
1870             ndrx_rbt_begin_iterate(rbt_trees[i], LeftRightWalk, &iter);
1871             while (NULL!=(node=(tmq_memmsg_t *)ndrx_rbt_iterate(&iter)))
1872             {
1873                 *p_msgs = *p_msgs +1 ;
1874             }
1875         }
1876     }
1877 
1878     MUTEX_UNLOCK_V(M_q_lock);
1879 }
1880 
1881 /******************************************************************************/
1882 /*                         COMMAND LINE SUPPORT                               */
1883 /******************************************************************************/
1884 
1885 /* vim: set ts=4 sw=4 et smartindent: */