Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Persistent queue commons (used between tmqsrv and "userspace" api)
0003  *
0004  * @file qcommon.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdint.h>
0037 #include <stdlib.h>
0038 #include <memory.h>
0039 #include <errno.h>
0040 #include <dlfcn.h>
0041 
0042 #include <ndrstandard.h>
0043 #include <ndebug.h>
0044 #include <ndrxdcmn.h>
0045 #include <userlog.h>
0046 #include <ubf.h>
0047 #include <ubfutil.h>
0048 #include <Exfields.h>
0049 #include <typed_buf.h>
0050 #include <qcommon.h>
0051 #include <exbase64.h>
0052 #include <atmi_tls.h>
0053 
0054 #include "tperror.h"
0055 /*---------------------------Externs------------------------------------*/
0056 /*---------------------------Macros-------------------------------------*/
0057 #define OFSZ(s,e)   EXOFFSET(s,e), EXELEM_SIZE(s,e)
0058 /*---------------------------Enums--------------------------------------*/
0059 /*---------------------------Typedefs-----------------------------------*/
0060 /*---------------------------Globals------------------------------------*/
0061 /*---------------------------Statics------------------------------------*/
0062 
0063 /**
0064  * TPQCTL structure mappings.
0065  */
0066 static ubf_c_map_t M_tpqctl_map[] = 
0067 {
0068     {EX_QFLAGS,         0, OFSZ(TPQCTL, flags),         BFLD_LONG}, /* 0 */
0069     {EX_QDEQ_TIME,      0, OFSZ(TPQCTL, deq_time),      BFLD_LONG}, /* 1 */
0070     {EX_QPRIORITY,      0, OFSZ(TPQCTL, priority),      BFLD_LONG}, /* 2 */
0071     {EX_QDIAGNOSTIC,    0, OFSZ(TPQCTL, diagnostic),    BFLD_LONG}, /* 3 */
0072     {EX_QMSGID,         0, OFSZ(TPQCTL, msgid),         BFLD_CARRAY}, /* 4 */
0073     {EX_QCORRID,        0, OFSZ(TPQCTL, corrid),        BFLD_CARRAY}, /* 5 */
0074     {EX_QREPLYQUEUE,    0, OFSZ(TPQCTL, replyqueue),    BFLD_STRING}, /* 6 */
0075     {EX_QFAILUREQUEUE,  0, OFSZ(TPQCTL, failurequeue),  BFLD_STRING}, /* 7 */
0076     {EX_CLTID,          0, OFSZ(TPQCTL, cltid),         BFLD_STRING}, /* 8 */
0077     {EX_QURCODE,        0, OFSZ(TPQCTL, urcode),        BFLD_LONG}, /* 9 */
0078     {EX_QAPPKEY,        0, OFSZ(TPQCTL, appkey),        BFLD_LONG}, /* 10 */
0079     {EX_QDELIVERY_QOS,  0, OFSZ(TPQCTL, delivery_qos),  BFLD_LONG}, /* 11 */
0080     {EX_QREPLY_QOS,     0, OFSZ(TPQCTL, reply_qos),     BFLD_LONG}, /* 12 */
0081     {EX_QEXP_TIME,      0, OFSZ(TPQCTL, exp_time),      BFLD_LONG}, /* 13 */
0082     {EX_QDIAGMSG,       0, OFSZ(TPQCTL, diagmsg),       BFLD_STRING}, /* 14 */
0083     {BBADFLDID}
0084 };
0085 
0086 /**
0087  * Enqueue request structure
0088  */
0089 static long M_tpqctl_enqreq[] = 
0090 {
0091     UBFUTIL_EXPORT,/* 0 - EX_QFLAGS*/
0092     UBFUTIL_EXPORT,/* 1 - EX_QDEQ_TIME*/
0093     UBFUTIL_EXPORT,/* 2 - EX_QPRIORITY*/
0094     0,             /* 3 - EX_QDIAGNOSTIC*/
0095     UBFUTIL_EXPORT,/* 4 - EX_QMSGID*/
0096     UBFUTIL_EXPORT,/* 5 - EX_QCORRID*/
0097     UBFUTIL_EXPORT,/* 6 - EX_QREPLYQUEUE*/
0098     UBFUTIL_EXPORT,/* 7 - EX_QFAILUREQUEUE*/
0099     UBFUTIL_EXPORT,/* 8 - EX_CLTID*/
0100     UBFUTIL_EXPORT,/* 9 - EX_QURCODE*/
0101     UBFUTIL_EXPORT,/* 10 - EX_QAPPKEY*/
0102     UBFUTIL_EXPORT,/* 11 - EX_QDELIVERY_QOS*/
0103     UBFUTIL_EXPORT,/* 12 - EX_QREPLY_QOS*/
0104     UBFUTIL_EXPORT,/* 13 - EX_QEXP_TIME*/
0105     0              /* 14 - EX_QDIAGMSG*/
0106 };
0107 
0108 /**
0109  * Enqueue response structure
0110  */
0111 static long M_tpqctl_enqrsp[] = 
0112 {
0113     UBFUTIL_EXPORT,/* 0 - EX_QFLAGS*/
0114     0,/* 1 - EX_QDEQ_TIME*/
0115     0,/* 2 - EX_QPRIORITY*/
0116     UBFUTIL_EXPORT,/* 3 - EX_QDIAGNOSTIC*/
0117     UBFUTIL_EXPORT,/* 4 - EX_QMSGID*/
0118     0,/* 5 - EX_QCORRID*/
0119     0,/* 6 - EX_QREPLYQUEUE*/
0120     0,/* 7 - EX_QFAILUREQUEUE*/
0121     0,/* 8 - EX_CLTID*/
0122     0,/* 9 - EX_QURCODE*/
0123     0,/* 10 - EX_QAPPKEY*/
0124     0,/* 11 - EX_QDELIVERY_QOS*/
0125     0,/* 12 - EX_QREPLY_QOS*/
0126     0,/* 13 - EX_QEXP_TIME*/
0127     UBFUTIL_EXPORT              /* 14 - EX_QDIAGMSG*/
0128 };
0129 
0130 
0131 /**
0132  * dequeue request structure
0133  */
0134 static long M_tpqctl_deqreq[] = 
0135 {
0136     UBFUTIL_EXPORT,/* 0 - EX_QFLAGS*/
0137     0,/* 1 - EX_QDEQ_TIME*/
0138     0,/* 2 - EX_QPRIORITY*/
0139     0,             /* 3 - EX_QDIAGNOSTIC*/
0140     UBFUTIL_EXPORT,/* 4 - EX_QMSGID*/
0141     UBFUTIL_EXPORT,/* 5 - EX_QCORRID*/
0142     0,/* 6 - EX_QREPLYQUEUE*/
0143     0,/* 7 - EX_QFAILUREQUEUE*/
0144     0,/* 8 - EX_CLTID*/
0145     0,/* 9 - EX_QURCODE*/
0146     0,/* 10 - EX_QAPPKEY*/
0147     0,/* 11 - EX_QDELIVERY_QOS*/
0148     0,/* 12 - EX_QREPLY_QOS*/
0149     0,/* 13 - EX_QEXP_TIME*/
0150     0 /* 14 - EX_QDIAGMSG*/
0151 };
0152 
0153 /**
0154  * Dequeue response structure
0155  */
0156 static long M_tpqctl_deqrsp[] = 
0157 {
0158     UBFUTIL_EXPORT,/* 0 - EX_QFLAGS*/
0159     0,/* 1 - EX_QDEQ_TIME*/
0160     UBFUTIL_EXPORT,/* 2 - EX_QPRIORITY*/
0161     UBFUTIL_EXPORT,/* 3 - EX_QDIAGNOSTIC*/
0162     UBFUTIL_EXPORT,/* 4 - EX_QMSGID*/
0163     UBFUTIL_EXPORT,/* 5 - EX_QCORRID*/
0164     UBFUTIL_EXPORT,/* 6 - EX_QREPLYQUEUE*/
0165     UBFUTIL_EXPORT,/* 7 - EX_QFAILUREQUEUE*/
0166     UBFUTIL_EXPORT,/* 8 - EX_CLTID*/
0167     UBFUTIL_EXPORT,/* 9 - EX_QURCODE*/
0168     UBFUTIL_EXPORT,/* 10 - EX_QAPPKEY*/
0169     UBFUTIL_EXPORT,/* 11 - EX_QDELIVERY_QOS*/
0170     UBFUTIL_EXPORT,/* 12 - EX_QREPLY_QOS*/
0171     0,/* 13 - EX_QEXP_TIME*/
0172     UBFUTIL_EXPORT              /* 14 - EX_QDIAGMSG*/
0173 };
0174 
0175 
0176 /**
0177  * Copy the TPQCTL data to buffer, request data
0178  * @param p_ub destination buffer
0179  * @param ctl source struct
0180  * @return SUCCEED/FAIL
0181  */
0182 expublic int tmq_tpqctl_to_ubf_enqreq(UBFH *p_ub, TPQCTL *ctl)
0183 {
0184     int ret = EXSUCCEED;
0185     
0186     ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_enqreq);
0187     
0188     return ret;
0189 }
0190 
0191 /**
0192  * Build the TPQCTL from UBF buffer, request data
0193  * @param p_ub source buffer
0194  * @param ctl destination strcture
0195  * @return SUCCEED/FAIL
0196  */
0197 expublic int tmq_tpqctl_from_ubf_enqreq(UBFH *p_ub, TPQCTL *ctl)
0198 {
0199     int ret = EXSUCCEED;
0200     
0201     ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_enqreq);
0202     
0203     return ret;
0204 }
0205 
0206 
0207 /**
0208  * Copy the TPQCTL data to buffer, request data
0209  * @param p_ub destination buffer
0210  * @param ctl source struct
0211  * @return SUCCEED/FAIL
0212  */
0213 expublic int tmq_tpqctl_to_ubf_enqrsp(UBFH *p_ub, TPQCTL *ctl)
0214 {
0215     int ret = EXSUCCEED;
0216     
0217     ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_enqrsp);
0218     
0219     return ret;
0220 }
0221 
0222 /**
0223  * Build the TPQCTL from UBF buffer, request data
0224  * @param p_ub source buffer
0225  * @param ctl destination strcture
0226  * @return SUCCEED/FAIL
0227  */
0228 expublic int tmq_tpqctl_from_ubf_enqrsp(UBFH *p_ub, TPQCTL *ctl)
0229 {
0230     int ret = EXSUCCEED;
0231     
0232     ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_enqrsp);
0233     
0234     return ret;
0235 }
0236 
0237 /**
0238  * Copy the TPQCTL data to buffer, request data
0239  * @param p_ub destination buffer
0240  * @param ctl source struct
0241  * @return SUCCEED/FAIL
0242  */
0243 expublic int tmq_tpqctl_to_ubf_deqreq(UBFH *p_ub, TPQCTL *ctl)
0244 {
0245     int ret = EXSUCCEED;
0246     
0247     ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_deqreq);
0248     
0249     return ret;
0250 }
0251 
0252 /**
0253  * Build the TPQCTL from UBF buffer, request data
0254  * @param p_ub source buffer
0255  * @param ctl destination strcture
0256  * @return SUCCEED/FAIL
0257  */
0258 expublic int tmq_tpqctl_from_ubf_deqreq(UBFH *p_ub, TPQCTL *ctl)
0259 {
0260     int ret = EXSUCCEED;
0261     
0262     ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_deqreq);
0263     
0264     return ret;
0265 }
0266 
0267 
0268 /**
0269  * Copy the TPQCTL data to buffer, request data
0270  * @param p_ub destination buffer
0271  * @param ctl source struct
0272  * @return SUCCEED/FAIL
0273  */
0274 expublic int tmq_tpqctl_to_ubf_deqrsp(UBFH *p_ub, TPQCTL *ctl)
0275 {
0276     int ret = EXSUCCEED;
0277     
0278     ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_deqrsp);
0279     
0280     return ret;
0281 }
0282 
0283 /**
0284  * Build the TPQCTL from UBF buffer, request data
0285  * @param p_ub source buffer
0286  * @param ctl destination strcture
0287  * @return SUCCEED/FAIL
0288  */
0289 expublic int tmq_tpqctl_from_ubf_deqrsp(UBFH *p_ub, TPQCTL *ctl)
0290 {
0291     int ret = EXSUCCEED;
0292     
0293     ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_deqrsp);
0294     
0295     return ret;
0296 }
0297 
0298 
0299 /**
0300  * Generate serialized version of the string
0301  * @param msgid_in, length defined by constant TMMSGIDLEN
0302  * @param msgidstr_out
0303  * @return msgidstr_out
0304  */
0305 expublic char * tmq_msgid_serialize(char *msgid_in, char *msgid_str_out)
0306 {
0307     size_t out_len = 0;
0308     
0309     NDRX_DUMP(log_debug, "Original MSGID", msgid_in, TMMSGIDLEN);
0310     
0311     ndrx_xa_base64_encode((unsigned char *)msgid_in, TMMSGIDLEN, &out_len, 
0312             msgid_str_out);
0313 
0314     /* msgid_str_out[out_len] = EXEOS; */
0315     
0316     NDRX_LOG(log_debug, "MSGID after serialize: [%s]", msgid_str_out);
0317     
0318     return msgid_str_out;
0319 }
0320 
0321 /**
0322  * Get binary message id
0323  * @param msgid_str_in, length defined by constant TMMSGIDLEN
0324  * @param msgid_out
0325  * @return msgid_out 
0326  */
0327 expublic char * tmq_msgid_deserialize(char *msgid_str_in, char *msgid_out)
0328 {
0329     size_t tot_len = 0;
0330     
0331     NDRX_LOG(log_debug, "Serialized MSGID: [%s]", msgid_str_in);
0332     
0333     memset(msgid_out, 0, TMMSGIDLEN);
0334         
0335     ndrx_xa_base64_decode((unsigned char *)msgid_str_in, strlen(msgid_str_in), 
0336             &tot_len, msgid_out);
0337     
0338     NDRX_DUMP(log_debug, "Deserialized MSGID", msgid_out, TMMSGIDLEN);
0339     
0340     return msgid_out;
0341 }
0342 
0343 /**************************** API SECTION *************************************/
0344 
0345 /**
0346  * Internal version of message enqueue.
0347  * TODO: forward ATMI error!
0348  * In case if time TPQTIME_REL is used then time is converted to absolute unix epoch time
0349  * 
0350  * @param qspace service name
0351  * @param qname queue name
0352  * @param ctl control data
0353  * @param data data to enqueue
0354  * @param len data len
0355  * @param flags flags (for tpcall). TPQTIME_ABS and TPQTIME_REL are mutually exclusive
0356  * @return SUCCEED/FAIL
0357  */
0358 expublic int ndrx_tpenqueue (char *qspace, short nodeid, short srvid, char *qname, TPQCTL *ctl, 
0359         char *data, long len, long flags)
0360 {
0361     int ret = EXSUCCEED;
0362     long rsplen;
0363     char cmd = TMQ_CMD_ENQUEUE;
0364     char *tmp=NULL;
0365     long tmp_len;
0366     UBFH *p_ub = NULL;
0367     atmi_error_t errbuf;
0368     char qspacesvc[XATMI_SERVICE_NAME_LENGTH+1]; /* real service name */
0369     long dec_time_org = ctl->deq_time;
0370     
0371     NDRX_SYSBUF_MALLOC_WERR_OUT(tmp, tmp_len, ret);
0372 
0373     /*
0374      * Support #403
0375     if (NULL==data)
0376     {
0377         ndrx_TPset_error_fmt(TPEINVAL,  "%s: data is null!", __func__);
0378         EXFAIL_OUT(ret);
0379     }
0380     */
0381     
0382     if (NULL==qspace || (EXEOS==*qspace && !nodeid && !srvid))
0383     {
0384         ndrx_TPset_error_fmt(TPEINVAL,  "%s: empty or NULL qspace!", __func__);
0385         EXFAIL_OUT(ret);
0386     }
0387     
0388     if (NULL==qname || EXEOS==*qname)
0389     {
0390         ndrx_TPset_error_fmt(TPEINVAL,  "%s: empty or NULL qname!", __func__);
0391         EXFAIL_OUT(ret);
0392     }
0393     
0394     if (NULL==ctl)
0395     {
0396         ndrx_TPset_error_fmt(TPEINVAL,  "%s: NULL ctl!", __func__);
0397         EXFAIL_OUT(ret);
0398     }
0399 
0400     if (ctl->flags & TPQTIME_ABS && ctl->flags & TPQTIME_REL)
0401     {
0402         ndrx_TPset_error_fmt(TPEINVAL,  
0403             "%s: TPQTIME_ABS and TPQTIME_REL are mutually exclusive!", __func__);
0404         EXFAIL_OUT(ret);
0405     }
0406     
0407     /* convert time */
0408     if (ctl->flags&TPQTIME_REL)
0409     {
0410         ctl->deq_time = time(NULL) + ctl->deq_time;
0411         ctl->flags&=~TPQTIME_REL;
0412         ctl->flags|=TPQTIME_ABS;
0413     }
0414 
0415     ctl->diagnostic=0;
0416     
0417     if (EXFAIL==tptypes(data, NULL, NULL))
0418     {
0419         ndrx_TPset_error_fmt(TPEINVAL,  "%s: data buffer not allocated by "
0420                 "tpalloc()", __func__);
0421         EXFAIL_OUT(ret);
0422     }
0423     
0424     /* prepare buffer for call */
0425     if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, tmp, &tmp_len, 0, 
0426             NDRX_MBUF_FLAG_NOCALLINFO))
0427     {
0428         /* not good - error should be already set */
0429         EXFAIL_OUT(ret);
0430     }
0431     
0432     NDRX_DUMP(log_debug, "Buffer for sending data out", tmp, tmp_len);
0433     
0434     /* Alloc the FB */
0435     
0436     if (NULL == (p_ub = (UBFH *)tpalloc("UBF", "", TMQ_DEFAULT_BUFSZ+tmp_len)))
0437     {
0438         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to allocate req buffer: %s", 
0439                 __func__, Bstrerror(Berror));
0440         EXFAIL_OUT(ret);
0441     }
0442     
0443     /* Convert the structure to UBF */
0444     if (EXSUCCEED!=tmq_tpqctl_to_ubf_enqreq(p_ub, ctl))
0445     {
0446         
0447         ndrx_TPset_error_fmt(TPEINVAL,  "%s: failed convert ctl "
0448                 "to internal UBF buf!", __func__);
0449         EXFAIL_OUT(ret);
0450     }
0451     
0452     if (EXSUCCEED!=Bchg(p_ub, EX_DATA, 0, tmp, tmp_len))
0453     {
0454         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to set data field: %s", 
0455                 Bstrerror(Berror), __func__);
0456         EXFAIL_OUT(ret);
0457     }
0458     
0459     /* Setup the command in EX_QCMD */
0460     if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
0461     {
0462         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to set cmd field: %s", 
0463                 __func__, Bstrerror(Berror));
0464         EXFAIL_OUT(ret);
0465     }
0466     
0467     if (EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, qname, 0L))
0468     {
0469         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to set qname field: %s", 
0470                 __func__, Bstrerror(Berror));
0471         EXFAIL_OUT(ret);
0472     }
0473     
0474     ndrx_debug_dump_UBF(log_debug, "QSPACE enqueue request buffer", p_ub);
0475     
0476     /* do the call to queue system */
0477     if (EXEOS!=*qspace)
0478     {
0479         snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_QSPACE, qspace);
0480     }
0481     else
0482     {
0483         snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_TMQ, (long)nodeid, (int)srvid);
0484     }
0485     
0486     if (EXFAIL == tpcall(qspacesvc, (char *)p_ub, 0L, (char **)&p_ub, &rsplen, flags|TPNOABORT))
0487     {
0488         int tpe = tperrno;
0489         
0490         NDRX_LOG(log_error, "%s failed: %s", qspace, tpstrerror(tpe));
0491         if (TPESVCFAIL!=tpe)
0492         {
0493             EXFAIL_OUT(ret);
0494         }
0495         else
0496         {
0497             ret=EXFAIL;
0498         }
0499     }
0500     
0501     ndrx_debug_dump_UBF(log_debug, "QSPACE enqueue response buffer", p_ub);
0502     
0503     /* the call is ok (or app failed), convert back. */
0504     if (EXSUCCEED!=tmq_tpqctl_from_ubf_enqrsp(p_ub, ctl))
0505     {
0506         NDRX_LOG(log_error, "Failed convert ctl to internal UBF buf!");
0507         ndrx_TPoverride_code(TPESYSTEM);   
0508         EXFAIL_OUT(ret);
0509     }
0510     
0511 out:
0512 
0513     if (NULL!=p_ub)
0514     {
0515         atmi_error_t err;
0516         /* save any error here... */
0517         ndrx_TPsave_error(&err);
0518         tpfree((char *)p_ub);
0519         ndrx_TPrestore_error(&err);
0520     }
0521 
0522     /* restore the error if have */
0523     if (0!=tperrno)
0524     {
0525         atmi_error_t err;
0526         ndrx_TPsave_error(&err);
0527         
0528         if (ctl->diagnostic)
0529         {
0530             err.atmi_error = TPEDIAGNOSTIC;
0531             NDRX_STRCPY_SAFE(err.atmi_error_msg_buf, 
0532                     "error details in TPQCTL diag fields");
0533         }
0534         ndrx_TPrestore_error(&err);
0535         
0536         /* Special abort section */
0537         NDRX_ABORT_START(EXFALSE)
0538                    
0539         if (TPEDIAGNOSTIC==tperrno &&
0540                 (   QMEINVAL==ctl->diagnostic 
0541                  || QMEBADQUEUE==ctl->diagnostic
0542                 )
0543             )
0544         {
0545             abort_needed=EXFALSE;
0546         }
0547 
0548         NDRX_ABORT_END(EXFALSE)
0549                 
0550     }
0551     else
0552     {
0553         ctl->diagnostic = EXFALSE;
0554     }
0555 
0556     if (NULL!=tmp)
0557     {
0558         NDRX_SYSBUF_FREE(tmp);
0559     }
0560 
0561 
0562     NDRX_LOG(log_info, "%s: return %d", __func__, ret);
0563 
0564     return ret;    
0565 }
0566 
0567 /**
0568  * Internal version of message dequeue.
0569  * TODO: forward ATMI error!
0570  * 
0571  * @param qspace service name
0572  * @param qname queue name
0573  * @param ctl control data
0574  * @param data data to enqueue
0575  * @param len data len
0576  * @param flags flags (for tpcall)
0577  * @return SUCCEED/FAIL
0578  */
0579 expublic int ndrx_tpdequeue (char *qspace, short nodeid, short srvid, char *qname, TPQCTL *ctl, 
0580         char **data, long *len, long flags)
0581 {
0582     int ret = EXSUCCEED;
0583     long rsplen;
0584     char cmd = TMQ_CMD_DEQUEUE;
0585     atmi_error_t errbuf;
0586     UBFH *p_ub = (UBFH *)tpalloc("UBF", "", TMQ_DEFAULT_BUFSZ);
0587     char qspacesvc[XATMI_SERVICE_NAME_LENGTH+1]; /* real service name */
0588     
0589     if (NULL==qspace || (EXEOS==*qspace && !nodeid && !srvid))
0590     {
0591         ndrx_TPset_error_fmt(TPEINVAL,  "%s: empty or NULL qspace!", __func__);
0592         EXFAIL_OUT(ret);
0593     }
0594     
0595     if (NULL==qname || EXEOS==*qname)
0596     {
0597         ndrx_TPset_error_fmt(TPEINVAL,  "%s: empty or NULL qname!", __func__);
0598         EXFAIL_OUT(ret);
0599     }
0600     
0601     if (NULL==ctl)
0602     {
0603         ndrx_TPset_error_fmt(TPEINVAL,  "%s: NULL ctl!", __func__);
0604         EXFAIL_OUT(ret);
0605     }
0606     
0607     ctl->diagnostic=0;
0608     
0609     if (NULL==data)
0610     {
0611         ndrx_TPset_error_fmt(TPEINVAL,  "%s: data is null!", __func__);
0612         EXFAIL_OUT(ret);
0613     }
0614     
0615     if (NULL==len)
0616     {
0617         ndrx_TPset_error_fmt(TPEINVAL,  "%s: len is null!", __func__);
0618         EXFAIL_OUT(ret);
0619     }
0620     
0621     if (EXFAIL==tptypes(*data, NULL, NULL))
0622     {
0623         ndrx_TPset_error_fmt(TPEINVAL,  "%s: data buffer not allocated by "
0624                 "tpalloc()", __func__);
0625         EXFAIL_OUT(ret);
0626     }
0627     
0628     /* Alloc the request buffer */
0629     if (NULL == p_ub)
0630     {
0631         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to allocate req buffer: %s", 
0632                 __func__, Bstrerror(Berror));
0633         EXFAIL_OUT(ret);
0634     }
0635     
0636     /* Convert the structure to UBF */
0637     if (EXSUCCEED!=tmq_tpqctl_to_ubf_deqreq(p_ub, ctl))
0638     {
0639         
0640         ndrx_TPset_error_fmt(TPEINVAL,  "%s: failed convert ctl "
0641                 "to internal UBF buf!", __func__);
0642         EXFAIL_OUT(ret);
0643     }
0644     
0645     /* set the data field */
0646     
0647     if (EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, qname, 0L))
0648     {
0649         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to set qname field: %s", 
0650                 __func__, Bstrerror(Berror));
0651         EXFAIL_OUT(ret);
0652     }
0653     
0654     /* Setup the command in EX_QCMD */
0655     if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
0656     {
0657         ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to set cmd field: %s", 
0658                 __func__, Bstrerror(Berror));
0659         EXFAIL_OUT(ret);
0660     }
0661     
0662     /* do the call to queue system */
0663     ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue request buffer", p_ub);
0664     
0665     if (EXEOS!=*qspace)
0666     {
0667         snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_QSPACE, qspace);
0668     }
0669     else
0670     {
0671         snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_TMQ, (long)nodeid, (int)srvid);
0672     }
0673     
0674     if (EXFAIL == tpcall(qspacesvc, (char *)p_ub, 0L, (char **)&p_ub, &rsplen, 
0675             flags | TPNOABORT))
0676     {
0677         int tpe = tperrno;                
0678         NDRX_LOG(log_error, "%s failed: %s", qspace, tpstrerror(tpe));
0679         if (TPESVCFAIL!=tpe)
0680         {
0681             EXFAIL_OUT(ret);
0682         }
0683         else
0684         {
0685             ret=EXFAIL;
0686         }
0687         
0688         ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue response buffer", p_ub);
0689         
0690     }
0691     else
0692     {
0693         BFLDLEN len_extra=0;
0694         char *data_extra = NULL;
0695         
0696         ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue response buffer", p_ub);
0697         
0698         if (NULL==(data_extra=Bgetalloc(p_ub, EX_DATA, 0, &len_extra)))
0699         {
0700             ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to get EX_DATA: %s", 
0701                     __func__, Bstrerror(Berror));
0702             EXFAIL_OUT(ret);
0703         }
0704 
0705         ret=ndrx_mbuf_prepare_incoming(data_extra,
0706                         len_extra,
0707                         data,
0708                         len,
0709                         flags, 0);
0710         if (EXSUCCEED!=ret)
0711         {
0712             ndrx_TPset_error_fmt(TPESYSTEM,  "%s: Failed to prepare incoming buffer: %s", 
0713                     __func__, Bstrerror(Berror));
0714             
0715             NDRX_FREE(data_extra);
0716             EXFAIL_OUT(ret);
0717         }
0718         NDRX_FREE(data_extra);
0719     }
0720     
0721     /* the call is ok (or app failed), convert back. */
0722     if (EXSUCCEED!=tmq_tpqctl_from_ubf_deqrsp(p_ub, ctl))
0723     {
0724         NDRX_LOG(log_error, "Failed convert ctl to internal UBF buf!");
0725         ndrx_TPoverride_code(TPESYSTEM);
0726         EXFAIL_OUT(ret);
0727     }
0728     
0729 out:
0730 
0731     if (NULL!=p_ub)
0732     {
0733         atmi_error_t err;
0734         /* save any error here... */
0735         ndrx_TPsave_error(&err);
0736         tpfree((char *)p_ub);
0737         ndrx_TPrestore_error(&err);
0738     }
0739 
0740     /* restore the error if have */
0741     if (0!=tperrno)
0742     {
0743         atmi_error_t err;
0744         ndrx_TPsave_error(&err);
0745         
0746         if (ctl->diagnostic)
0747         {
0748             err.atmi_error = TPEDIAGNOSTIC;
0749             NDRX_STRCPY_SAFE(err.atmi_error_msg_buf, 
0750                     "error details in TPQCTL diag fields");
0751         }
0752         ndrx_TPrestore_error(&err);
0753         
0754         /* Special abort section */
0755         NDRX_ABORT_START(EXFALSE)
0756                    
0757         if (TPEDIAGNOSTIC==tperrno &&
0758                 (      QMEINVAL==ctl->diagnostic 
0759                     || QMENOMSG==ctl->diagnostic
0760                     || QMEBADQUEUE==ctl->diagnostic))
0761         {
0762             abort_needed=EXFALSE;
0763         }
0764 
0765         NDRX_ABORT_END(EXFALSE)
0766     }
0767 
0768 
0769     NDRX_LOG(log_info, "%s: return %d", __func__, ret);
0770 
0771     return ret;    
0772 }
0773 /* vim: set ts=4 sw=4 et smartindent: */