0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdint.h>
0037 #include <stdlib.h>
0038 #include <memory.h>
0039 #include <errno.h>
0040 #include <dlfcn.h>
0041
0042 #include <ndrstandard.h>
0043 #include <ndebug.h>
0044 #include <ndrxdcmn.h>
0045 #include <userlog.h>
0046 #include <ubf.h>
0047 #include <ubfutil.h>
0048 #include <Exfields.h>
0049 #include <typed_buf.h>
0050 #include <qcommon.h>
0051 #include <exbase64.h>
0052 #include <atmi_tls.h>
0053
0054 #include "tperror.h"
0055
0056
0057 #define OFSZ(s,e) EXOFFSET(s,e), EXELEM_SIZE(s,e)
0058
0059
0060
0061
0062
0063
0064
0065
0066 static ubf_c_map_t M_tpqctl_map[] =
0067 {
0068 {EX_QFLAGS, 0, OFSZ(TPQCTL, flags), BFLD_LONG},
0069 {EX_QDEQ_TIME, 0, OFSZ(TPQCTL, deq_time), BFLD_LONG},
0070 {EX_QPRIORITY, 0, OFSZ(TPQCTL, priority), BFLD_LONG},
0071 {EX_QDIAGNOSTIC, 0, OFSZ(TPQCTL, diagnostic), BFLD_LONG},
0072 {EX_QMSGID, 0, OFSZ(TPQCTL, msgid), BFLD_CARRAY},
0073 {EX_QCORRID, 0, OFSZ(TPQCTL, corrid), BFLD_CARRAY},
0074 {EX_QREPLYQUEUE, 0, OFSZ(TPQCTL, replyqueue), BFLD_STRING},
0075 {EX_QFAILUREQUEUE, 0, OFSZ(TPQCTL, failurequeue), BFLD_STRING},
0076 {EX_CLTID, 0, OFSZ(TPQCTL, cltid), BFLD_STRING},
0077 {EX_QURCODE, 0, OFSZ(TPQCTL, urcode), BFLD_LONG},
0078 {EX_QAPPKEY, 0, OFSZ(TPQCTL, appkey), BFLD_LONG},
0079 {EX_QDELIVERY_QOS, 0, OFSZ(TPQCTL, delivery_qos), BFLD_LONG},
0080 {EX_QREPLY_QOS, 0, OFSZ(TPQCTL, reply_qos), BFLD_LONG},
0081 {EX_QEXP_TIME, 0, OFSZ(TPQCTL, exp_time), BFLD_LONG},
0082 {EX_QDIAGMSG, 0, OFSZ(TPQCTL, diagmsg), BFLD_STRING},
0083 {BBADFLDID}
0084 };
0085
0086
0087
0088
0089 static long M_tpqctl_enqreq[] =
0090 {
0091 UBFUTIL_EXPORT,
0092 UBFUTIL_EXPORT,
0093 UBFUTIL_EXPORT,
0094 0,
0095 UBFUTIL_EXPORT,
0096 UBFUTIL_EXPORT,
0097 UBFUTIL_EXPORT,
0098 UBFUTIL_EXPORT,
0099 UBFUTIL_EXPORT,
0100 UBFUTIL_EXPORT,
0101 UBFUTIL_EXPORT,
0102 UBFUTIL_EXPORT,
0103 UBFUTIL_EXPORT,
0104 UBFUTIL_EXPORT,
0105 0
0106 };
0107
0108
0109
0110
0111 static long M_tpqctl_enqrsp[] =
0112 {
0113 UBFUTIL_EXPORT,
0114 0,
0115 0,
0116 UBFUTIL_EXPORT,
0117 UBFUTIL_EXPORT,
0118 0,
0119 0,
0120 0,
0121 0,
0122 0,
0123 0,
0124 0,
0125 0,
0126 0,
0127 UBFUTIL_EXPORT
0128 };
0129
0130
0131
0132
0133
0134 static long M_tpqctl_deqreq[] =
0135 {
0136 UBFUTIL_EXPORT,
0137 0,
0138 0,
0139 0,
0140 UBFUTIL_EXPORT,
0141 UBFUTIL_EXPORT,
0142 0,
0143 0,
0144 0,
0145 0,
0146 0,
0147 0,
0148 0,
0149 0,
0150 0
0151 };
0152
0153
0154
0155
0156 static long M_tpqctl_deqrsp[] =
0157 {
0158 UBFUTIL_EXPORT,
0159 0,
0160 UBFUTIL_EXPORT,
0161 UBFUTIL_EXPORT,
0162 UBFUTIL_EXPORT,
0163 UBFUTIL_EXPORT,
0164 UBFUTIL_EXPORT,
0165 UBFUTIL_EXPORT,
0166 UBFUTIL_EXPORT,
0167 UBFUTIL_EXPORT,
0168 UBFUTIL_EXPORT,
0169 UBFUTIL_EXPORT,
0170 UBFUTIL_EXPORT,
0171 0,
0172 UBFUTIL_EXPORT
0173 };
0174
0175
0176
0177
0178
0179
0180
0181
0182 expublic int tmq_tpqctl_to_ubf_enqreq(UBFH *p_ub, TPQCTL *ctl)
0183 {
0184 int ret = EXSUCCEED;
0185
0186 ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_enqreq);
0187
0188 return ret;
0189 }
0190
0191
0192
0193
0194
0195
0196
0197 expublic int tmq_tpqctl_from_ubf_enqreq(UBFH *p_ub, TPQCTL *ctl)
0198 {
0199 int ret = EXSUCCEED;
0200
0201 ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_enqreq);
0202
0203 return ret;
0204 }
0205
0206
0207
0208
0209
0210
0211
0212
0213 expublic int tmq_tpqctl_to_ubf_enqrsp(UBFH *p_ub, TPQCTL *ctl)
0214 {
0215 int ret = EXSUCCEED;
0216
0217 ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_enqrsp);
0218
0219 return ret;
0220 }
0221
0222
0223
0224
0225
0226
0227
0228 expublic int tmq_tpqctl_from_ubf_enqrsp(UBFH *p_ub, TPQCTL *ctl)
0229 {
0230 int ret = EXSUCCEED;
0231
0232 ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_enqrsp);
0233
0234 return ret;
0235 }
0236
0237
0238
0239
0240
0241
0242
0243 expublic int tmq_tpqctl_to_ubf_deqreq(UBFH *p_ub, TPQCTL *ctl)
0244 {
0245 int ret = EXSUCCEED;
0246
0247 ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_deqreq);
0248
0249 return ret;
0250 }
0251
0252
0253
0254
0255
0256
0257
0258 expublic int tmq_tpqctl_from_ubf_deqreq(UBFH *p_ub, TPQCTL *ctl)
0259 {
0260 int ret = EXSUCCEED;
0261
0262 ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_deqreq);
0263
0264 return ret;
0265 }
0266
0267
0268
0269
0270
0271
0272
0273
0274 expublic int tmq_tpqctl_to_ubf_deqrsp(UBFH *p_ub, TPQCTL *ctl)
0275 {
0276 int ret = EXSUCCEED;
0277
0278 ret=atmi_cvt_c_to_ubf(M_tpqctl_map, ctl, p_ub, M_tpqctl_deqrsp);
0279
0280 return ret;
0281 }
0282
0283
0284
0285
0286
0287
0288
0289 expublic int tmq_tpqctl_from_ubf_deqrsp(UBFH *p_ub, TPQCTL *ctl)
0290 {
0291 int ret = EXSUCCEED;
0292
0293 ret=atmi_cvt_ubf_to_c(M_tpqctl_map, p_ub, ctl, M_tpqctl_deqrsp);
0294
0295 return ret;
0296 }
0297
0298
0299
0300
0301
0302
0303
0304
0305 expublic char * tmq_msgid_serialize(char *msgid_in, char *msgid_str_out)
0306 {
0307 size_t out_len = 0;
0308
0309 NDRX_DUMP(log_debug, "Original MSGID", msgid_in, TMMSGIDLEN);
0310
0311 ndrx_xa_base64_encode((unsigned char *)msgid_in, TMMSGIDLEN, &out_len,
0312 msgid_str_out);
0313
0314
0315
0316 NDRX_LOG(log_debug, "MSGID after serialize: [%s]", msgid_str_out);
0317
0318 return msgid_str_out;
0319 }
0320
0321
0322
0323
0324
0325
0326
0327 expublic char * tmq_msgid_deserialize(char *msgid_str_in, char *msgid_out)
0328 {
0329 size_t tot_len = 0;
0330
0331 NDRX_LOG(log_debug, "Serialized MSGID: [%s]", msgid_str_in);
0332
0333 memset(msgid_out, 0, TMMSGIDLEN);
0334
0335 ndrx_xa_base64_decode((unsigned char *)msgid_str_in, strlen(msgid_str_in),
0336 &tot_len, msgid_out);
0337
0338 NDRX_DUMP(log_debug, "Deserialized MSGID", msgid_out, TMMSGIDLEN);
0339
0340 return msgid_out;
0341 }
0342
0343
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356
0357
0358 expublic int ndrx_tpenqueue (char *qspace, short nodeid, short srvid, char *qname, TPQCTL *ctl,
0359 char *data, long len, long flags)
0360 {
0361 int ret = EXSUCCEED;
0362 long rsplen;
0363 char cmd = TMQ_CMD_ENQUEUE;
0364 char *tmp=NULL;
0365 long tmp_len;
0366 UBFH *p_ub = NULL;
0367 atmi_error_t errbuf;
0368 char qspacesvc[XATMI_SERVICE_NAME_LENGTH+1];
0369 long dec_time_org = ctl->deq_time;
0370
0371 NDRX_SYSBUF_MALLOC_WERR_OUT(tmp, tmp_len, ret);
0372
0373
0374
0375
0376
0377
0378
0379
0380
0381
0382 if (NULL==qspace || (EXEOS==*qspace && !nodeid && !srvid))
0383 {
0384 ndrx_TPset_error_fmt(TPEINVAL, "%s: empty or NULL qspace!", __func__);
0385 EXFAIL_OUT(ret);
0386 }
0387
0388 if (NULL==qname || EXEOS==*qname)
0389 {
0390 ndrx_TPset_error_fmt(TPEINVAL, "%s: empty or NULL qname!", __func__);
0391 EXFAIL_OUT(ret);
0392 }
0393
0394 if (NULL==ctl)
0395 {
0396 ndrx_TPset_error_fmt(TPEINVAL, "%s: NULL ctl!", __func__);
0397 EXFAIL_OUT(ret);
0398 }
0399
0400 if (ctl->flags & TPQTIME_ABS && ctl->flags & TPQTIME_REL)
0401 {
0402 ndrx_TPset_error_fmt(TPEINVAL,
0403 "%s: TPQTIME_ABS and TPQTIME_REL are mutually exclusive!", __func__);
0404 EXFAIL_OUT(ret);
0405 }
0406
0407
0408 if (ctl->flags&TPQTIME_REL)
0409 {
0410 ctl->deq_time = time(NULL) + ctl->deq_time;
0411 ctl->flags&=~TPQTIME_REL;
0412 ctl->flags|=TPQTIME_ABS;
0413 }
0414
0415 ctl->diagnostic=0;
0416
0417 if (EXFAIL==tptypes(data, NULL, NULL))
0418 {
0419 ndrx_TPset_error_fmt(TPEINVAL, "%s: data buffer not allocated by "
0420 "tpalloc()", __func__);
0421 EXFAIL_OUT(ret);
0422 }
0423
0424
0425 if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, tmp, &tmp_len, 0,
0426 NDRX_MBUF_FLAG_NOCALLINFO))
0427 {
0428
0429 EXFAIL_OUT(ret);
0430 }
0431
0432 NDRX_DUMP(log_debug, "Buffer for sending data out", tmp, tmp_len);
0433
0434
0435
0436 if (NULL == (p_ub = (UBFH *)tpalloc("UBF", "", TMQ_DEFAULT_BUFSZ+tmp_len)))
0437 {
0438 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to allocate req buffer: %s",
0439 __func__, Bstrerror(Berror));
0440 EXFAIL_OUT(ret);
0441 }
0442
0443
0444 if (EXSUCCEED!=tmq_tpqctl_to_ubf_enqreq(p_ub, ctl))
0445 {
0446
0447 ndrx_TPset_error_fmt(TPEINVAL, "%s: failed convert ctl "
0448 "to internal UBF buf!", __func__);
0449 EXFAIL_OUT(ret);
0450 }
0451
0452 if (EXSUCCEED!=Bchg(p_ub, EX_DATA, 0, tmp, tmp_len))
0453 {
0454 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to set data field: %s",
0455 Bstrerror(Berror), __func__);
0456 EXFAIL_OUT(ret);
0457 }
0458
0459
0460 if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
0461 {
0462 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to set cmd field: %s",
0463 __func__, Bstrerror(Berror));
0464 EXFAIL_OUT(ret);
0465 }
0466
0467 if (EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, qname, 0L))
0468 {
0469 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to set qname field: %s",
0470 __func__, Bstrerror(Berror));
0471 EXFAIL_OUT(ret);
0472 }
0473
0474 ndrx_debug_dump_UBF(log_debug, "QSPACE enqueue request buffer", p_ub);
0475
0476
0477 if (EXEOS!=*qspace)
0478 {
0479 snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_QSPACE, qspace);
0480 }
0481 else
0482 {
0483 snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_TMQ, (long)nodeid, (int)srvid);
0484 }
0485
0486 if (EXFAIL == tpcall(qspacesvc, (char *)p_ub, 0L, (char **)&p_ub, &rsplen, flags|TPNOABORT))
0487 {
0488 int tpe = tperrno;
0489
0490 NDRX_LOG(log_error, "%s failed: %s", qspace, tpstrerror(tpe));
0491 if (TPESVCFAIL!=tpe)
0492 {
0493 EXFAIL_OUT(ret);
0494 }
0495 else
0496 {
0497 ret=EXFAIL;
0498 }
0499 }
0500
0501 ndrx_debug_dump_UBF(log_debug, "QSPACE enqueue response buffer", p_ub);
0502
0503
0504 if (EXSUCCEED!=tmq_tpqctl_from_ubf_enqrsp(p_ub, ctl))
0505 {
0506 NDRX_LOG(log_error, "Failed convert ctl to internal UBF buf!");
0507 ndrx_TPoverride_code(TPESYSTEM);
0508 EXFAIL_OUT(ret);
0509 }
0510
0511 out:
0512
0513 if (NULL!=p_ub)
0514 {
0515 atmi_error_t err;
0516
0517 ndrx_TPsave_error(&err);
0518 tpfree((char *)p_ub);
0519 ndrx_TPrestore_error(&err);
0520 }
0521
0522
0523 if (0!=tperrno)
0524 {
0525 atmi_error_t err;
0526 ndrx_TPsave_error(&err);
0527
0528 if (ctl->diagnostic)
0529 {
0530 err.atmi_error = TPEDIAGNOSTIC;
0531 NDRX_STRCPY_SAFE(err.atmi_error_msg_buf,
0532 "error details in TPQCTL diag fields");
0533 }
0534 ndrx_TPrestore_error(&err);
0535
0536
0537 NDRX_ABORT_START(EXFALSE)
0538
0539 if (TPEDIAGNOSTIC==tperrno &&
0540 ( QMEINVAL==ctl->diagnostic
0541 || QMEBADQUEUE==ctl->diagnostic
0542 )
0543 )
0544 {
0545 abort_needed=EXFALSE;
0546 }
0547
0548 NDRX_ABORT_END(EXFALSE)
0549
0550 }
0551 else
0552 {
0553 ctl->diagnostic = EXFALSE;
0554 }
0555
0556 if (NULL!=tmp)
0557 {
0558 NDRX_SYSBUF_FREE(tmp);
0559 }
0560
0561
0562 NDRX_LOG(log_info, "%s: return %d", __func__, ret);
0563
0564 return ret;
0565 }
0566
0567
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579 expublic int ndrx_tpdequeue (char *qspace, short nodeid, short srvid, char *qname, TPQCTL *ctl,
0580 char **data, long *len, long flags)
0581 {
0582 int ret = EXSUCCEED;
0583 long rsplen;
0584 char cmd = TMQ_CMD_DEQUEUE;
0585 atmi_error_t errbuf;
0586 UBFH *p_ub = (UBFH *)tpalloc("UBF", "", TMQ_DEFAULT_BUFSZ);
0587 char qspacesvc[XATMI_SERVICE_NAME_LENGTH+1];
0588
0589 if (NULL==qspace || (EXEOS==*qspace && !nodeid && !srvid))
0590 {
0591 ndrx_TPset_error_fmt(TPEINVAL, "%s: empty or NULL qspace!", __func__);
0592 EXFAIL_OUT(ret);
0593 }
0594
0595 if (NULL==qname || EXEOS==*qname)
0596 {
0597 ndrx_TPset_error_fmt(TPEINVAL, "%s: empty or NULL qname!", __func__);
0598 EXFAIL_OUT(ret);
0599 }
0600
0601 if (NULL==ctl)
0602 {
0603 ndrx_TPset_error_fmt(TPEINVAL, "%s: NULL ctl!", __func__);
0604 EXFAIL_OUT(ret);
0605 }
0606
0607 ctl->diagnostic=0;
0608
0609 if (NULL==data)
0610 {
0611 ndrx_TPset_error_fmt(TPEINVAL, "%s: data is null!", __func__);
0612 EXFAIL_OUT(ret);
0613 }
0614
0615 if (NULL==len)
0616 {
0617 ndrx_TPset_error_fmt(TPEINVAL, "%s: len is null!", __func__);
0618 EXFAIL_OUT(ret);
0619 }
0620
0621 if (EXFAIL==tptypes(*data, NULL, NULL))
0622 {
0623 ndrx_TPset_error_fmt(TPEINVAL, "%s: data buffer not allocated by "
0624 "tpalloc()", __func__);
0625 EXFAIL_OUT(ret);
0626 }
0627
0628
0629 if (NULL == p_ub)
0630 {
0631 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to allocate req buffer: %s",
0632 __func__, Bstrerror(Berror));
0633 EXFAIL_OUT(ret);
0634 }
0635
0636
0637 if (EXSUCCEED!=tmq_tpqctl_to_ubf_deqreq(p_ub, ctl))
0638 {
0639
0640 ndrx_TPset_error_fmt(TPEINVAL, "%s: failed convert ctl "
0641 "to internal UBF buf!", __func__);
0642 EXFAIL_OUT(ret);
0643 }
0644
0645
0646
0647 if (EXSUCCEED!=Bchg(p_ub, EX_QNAME, 0, qname, 0L))
0648 {
0649 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to set qname field: %s",
0650 __func__, Bstrerror(Berror));
0651 EXFAIL_OUT(ret);
0652 }
0653
0654
0655 if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
0656 {
0657 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to set cmd field: %s",
0658 __func__, Bstrerror(Berror));
0659 EXFAIL_OUT(ret);
0660 }
0661
0662
0663 ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue request buffer", p_ub);
0664
0665 if (EXEOS!=*qspace)
0666 {
0667 snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_QSPACE, qspace);
0668 }
0669 else
0670 {
0671 snprintf(qspacesvc, sizeof(qspacesvc), NDRX_SVC_TMQ, (long)nodeid, (int)srvid);
0672 }
0673
0674 if (EXFAIL == tpcall(qspacesvc, (char *)p_ub, 0L, (char **)&p_ub, &rsplen,
0675 flags | TPNOABORT))
0676 {
0677 int tpe = tperrno;
0678 NDRX_LOG(log_error, "%s failed: %s", qspace, tpstrerror(tpe));
0679 if (TPESVCFAIL!=tpe)
0680 {
0681 EXFAIL_OUT(ret);
0682 }
0683 else
0684 {
0685 ret=EXFAIL;
0686 }
0687
0688 ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue response buffer", p_ub);
0689
0690 }
0691 else
0692 {
0693 BFLDLEN len_extra=0;
0694 char *data_extra = NULL;
0695
0696 ndrx_debug_dump_UBF(log_debug, "QSPACE dequeue response buffer", p_ub);
0697
0698 if (NULL==(data_extra=Bgetalloc(p_ub, EX_DATA, 0, &len_extra)))
0699 {
0700 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to get EX_DATA: %s",
0701 __func__, Bstrerror(Berror));
0702 EXFAIL_OUT(ret);
0703 }
0704
0705 ret=ndrx_mbuf_prepare_incoming(data_extra,
0706 len_extra,
0707 data,
0708 len,
0709 flags, 0);
0710 if (EXSUCCEED!=ret)
0711 {
0712 ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to prepare incoming buffer: %s",
0713 __func__, Bstrerror(Berror));
0714
0715 NDRX_FREE(data_extra);
0716 EXFAIL_OUT(ret);
0717 }
0718 NDRX_FREE(data_extra);
0719 }
0720
0721
0722 if (EXSUCCEED!=tmq_tpqctl_from_ubf_deqrsp(p_ub, ctl))
0723 {
0724 NDRX_LOG(log_error, "Failed convert ctl to internal UBF buf!");
0725 ndrx_TPoverride_code(TPESYSTEM);
0726 EXFAIL_OUT(ret);
0727 }
0728
0729 out:
0730
0731 if (NULL!=p_ub)
0732 {
0733 atmi_error_t err;
0734
0735 ndrx_TPsave_error(&err);
0736 tpfree((char *)p_ub);
0737 ndrx_TPrestore_error(&err);
0738 }
0739
0740
0741 if (0!=tperrno)
0742 {
0743 atmi_error_t err;
0744 ndrx_TPsave_error(&err);
0745
0746 if (ctl->diagnostic)
0747 {
0748 err.atmi_error = TPEDIAGNOSTIC;
0749 NDRX_STRCPY_SAFE(err.atmi_error_msg_buf,
0750 "error details in TPQCTL diag fields");
0751 }
0752 ndrx_TPrestore_error(&err);
0753
0754
0755 NDRX_ABORT_START(EXFALSE)
0756
0757 if (TPEDIAGNOSTIC==tperrno &&
0758 ( QMEINVAL==ctl->diagnostic
0759 || QMENOMSG==ctl->diagnostic
0760 || QMEBADQUEUE==ctl->diagnostic))
0761 {
0762 abort_needed=EXFALSE;
0763 }
0764
0765 NDRX_ABORT_END(EXFALSE)
0766 }
0767
0768
0769 NDRX_LOG(log_info, "%s: return %d", __func__, ret);
0770
0771 return ret;
0772 }
0773