0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 #include <stdio.h>
0039 #include <stdarg.h>
0040 #include <memory.h>
0041 #include <stdlib.h>
0042 #include <errno.h>
0043 #include <sys_mqueue.h>
0044 #include <fcntl.h>
0045
0046 #include <atmi.h>
0047 #include <userlog.h>
0048 #include <ndebug.h>
0049 #include <tperror.h>
0050 #include <typed_buf.h>
0051 #include <atmi_int.h>
0052
0053 #include "../libatmisrv/srv_int.h"
0054
0055 #include "utlist.h"
0056 #include <thlock.h>
0057 #include <xa_cmn.h>
0058 #include <atmi_shm.h>
0059 #include <atmi_tls.h>
0060 #include <atmi_cache.h>
0061 #include <ndrx_ddr.h>
0062
0063
0064 #define NOENT_ERR_SHM 1
0065 #define NOENT_ERR_QUEUE 2
0066
0067
0068
0069
0070 exprivate NDRX_SPIN_LOCKDECL(M_callseq_lock);
0071
0072
0073 exprivate void unlock_call_descriptor(int cd, short status);
0074
0075
0076
0077
0078 expublic int ndrx_tpcall_init_once(void)
0079 {
0080 NDRX_SPIN_INIT_V(M_callseq_lock);
0081 return EXSUCCEED;
0082 }
0083
0084
0085
0086
0087
0088
0089 expublic void ndrx_dump_call_struct(int lev, tp_command_call_t *call)
0090 {
0091 ndrx_debug_t * dbg = debug_get_ndrx_ptr();
0092 if (dbg->level>=lev)
0093 {
0094 NDRX_LOG(lev, "=== Start of tp_command_call_t call dump, ptr=%p ===", call);
0095 NDRX_LOG(lev, "command_id=[%hd]", call->command_id);
0096 NDRX_LOG(lev, "proto_ver=[%c%c%c%c]", call->proto_ver[0], call->proto_ver[1],
0097 call->proto_ver[2], call->proto_ver[3]);
0098 NDRX_LOG(lev, "proto_magic=[%d]", call->proto_magic);
0099 NDRX_LOG(lev, "name=[%s]", call->name);
0100 NDRX_LOG(lev, "reply_to=[%s]", call->reply_to);
0101 NDRX_LOG(lev, "callstack=[%s]", call->callstack);
0102 NDRX_LOG(lev, "my_id=[%s]", call->my_id);
0103 NDRX_LOG(lev, "sysflags=[%p]", call->sysflags);
0104 NDRX_LOG(lev, "cd=[%d]", call->cd);
0105 NDRX_LOG(lev, "rval=[%d]", call->rval);
0106 NDRX_LOG(lev, "rcode=[%ld]", call->rcode);
0107 NDRX_LOG(lev, "extradata=[%s]", call->extradata);
0108 NDRX_LOG(lev, "flags=[%p]", call->flags);
0109 NDRX_LOG(lev, "timestamp=[%lu]", call->timestamp);
0110 NDRX_LOG(lev, "callseq=[%u]", call->callseq);
0111 NDRX_LOG(lev, "timer.tv_nsec=[%lu]", call->timer.t.tv_nsec);
0112 NDRX_LOG(lev, "timer.tv_sec=[%lu]", call->timer.t.tv_sec);
0113 NDRX_LOG(lev, "tmtxflags=[0x%x]", call->tmtxflags);
0114 NDRX_LOG(lev, "tmxid=[%s]", call->tmxid);
0115 NDRX_LOG(lev, "tmrmid=[%hd]", call->tmrmid);
0116 NDRX_LOG(lev, "tmnodeid=[%hd]", call->tmnodeid);
0117 NDRX_LOG(lev, "tmsrvid=[%hd]", call->tmsrvid);
0118 NDRX_LOG(lev, "tmknownrms=[%s]", call->tmknownrms);
0119 NDRX_LOG(lev, "data_len=[%ld]", call->data_len);
0120 NDRX_LOG(lev, "===== End of tp_command_call_t call dump, ptr=%p ===", call);
0121 }
0122 }
0123
0124
0125
0126
0127
0128
0129 exprivate int call_check_tout(int cd)
0130 {
0131 int ret=EXSUCCEED;
0132 time_t t;
0133 int t_diff;
0134
0135
0136 if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[cd].status &&
0137 !(G_atmi_tls->G_call_state[cd].flags & TPNOTIME) &&
0138 (t_diff = ((t = time(NULL)) - G_atmi_tls->G_call_state[cd].timestamp)) > G_atmi_tls->G_call_state[cd].tout_eff
0139 )
0140 {
0141
0142 NDRX_LOG(log_warn, "cd %d (callseq %u) timeout condition - generating error "
0143 "(locked at: %ld current tstamp: %ld, diff: %d, timeout_value: %d)",
0144 cd, G_atmi_tls->G_call_state[cd].callseq,
0145 G_atmi_tls->G_call_state[cd].timestamp, t, t_diff, G_atmi_tls->G_call_state[cd].tout_eff);
0146
0147 ndrx_TPset_error_fmt(TPETIME, "cd %d (callseq %u) timeout condition - generating error "
0148 "(locked at: %ld current tstamp: %ld, diff: %d, timeout_value: %d)",
0149 cd, G_atmi_tls->G_call_state[cd].callseq,
0150 G_atmi_tls->G_call_state[cd].timestamp, t, t_diff, G_atmi_tls->G_call_state[cd].tout_eff);
0151
0152
0153 unlock_call_descriptor(cd, CALL_CANCELED);
0154
0155 ret=EXFAIL;
0156 goto out;
0157 }
0158 out:
0159 return ret;
0160 }
0161
0162
0163
0164
0165 exprivate void call_dump_descriptors(void)
0166 {
0167 int i;
0168 time_t t = time(NULL);
0169 int t_diff;
0170 int cnt=0;
0171 ATMI_TLS_ENTRY;
0172
0173 NDRX_LOG(log_debug, "***List of call descriptors waiting for answer***");
0174 NDRX_LOG(log_debug, "timeout(system wide): %d curr_tstamp (sys-wide): %ld",
0175 G_atmi_env.time_out, t);
0176 NDRX_LOG(log_debug, "cd\tcallseq\tlocked_at\tdiff\tout_eff");
0177
0178 for (i=1; i<MAX_ASYNC_CALLS; i++)
0179 {
0180 if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[i].status)
0181 {
0182 t_diff = t - G_atmi_tls->G_call_state[i].timestamp;
0183 NDRX_LOG(log_debug, "%d\t%u\t%ld\t%d\t%d",
0184 i, G_atmi_tls->G_call_state[i].callseq,
0185 G_atmi_tls->G_call_state[i].timestamp, t_diff,
0186 G_atmi_tls->G_call_state[i].tout_eff);
0187 cnt++;
0188 }
0189 }
0190
0191 NDRX_LOG(log_warn, "cds waiting for answer: %d", cnt);
0192 NDRX_LOG(log_debug, "*************************************************");
0193 }
0194
0195 #define CALL_TOUT_DEBUG
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206 exprivate int call_scan_tout(int cd, int *cd_out)
0207 {
0208
0209
0210
0211
0212 int ret = EXSUCCEED;
0213 int i;
0214 long delta = 0;
0215
0216
0217
0218 #ifdef CALL_TOUT_DEBUG
0219 call_dump_descriptors();
0220 #endif
0221
0222
0223 if (G_atmi_tls->tpcall_first ||
0224 (delta=ndrx_stopwatch_get_delta(&G_atmi_tls->tpcall_start)) >=1000 ||
0225
0226 delta < 0)
0227 {
0228
0229 if (0 < cd)
0230 {
0231 if (EXSUCCEED!=call_check_tout(cd))
0232 {
0233 *cd_out = cd;
0234 ret=EXFAIL;
0235 goto out;
0236 }
0237 }
0238 else
0239 {
0240 for (i=1; i<MAX_ASYNC_CALLS; i++)
0241 {
0242 if (EXSUCCEED!=call_check_tout(i))
0243 {
0244 *cd_out = i;
0245 ret=EXFAIL;
0246 goto out;
0247 }
0248 }
0249 }
0250
0251 ndrx_stopwatch_reset(&G_atmi_tls->tpcall_start);
0252 G_atmi_tls->tpcall_first = EXFALSE;
0253 }
0254
0255 out:
0256
0257 return ret;
0258 }
0259
0260
0261
0262
0263
0264
0265
0266 expublic unsigned ndrx_get_next_callseq_shared(void)
0267 {
0268 static volatile unsigned shared_callseq=0;
0269 unsigned ret;
0270
0271 NDRX_SPIN_LOCK_V(M_callseq_lock);
0272 shared_callseq++;
0273 ret = shared_callseq;
0274 NDRX_SPIN_UNLOCK_V(M_callseq_lock);
0275
0276 return ret;
0277 }
0278
0279
0280
0281
0282
0283
0284 exprivate int get_call_descriptor_and_lock(unsigned *p_callseq,
0285 time_t timestamp, long flags, int tout_eff)
0286 {
0287 int start_cd = G_atmi_tls->tpcall_get_cd;
0288 int ret = EXFAIL;
0289 unsigned callseq=0;
0290
0291
0292
0293
0294
0295
0296 while (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[G_atmi_tls->tpcall_get_cd].status)
0297 {
0298 G_atmi_tls->tpcall_get_cd++;
0299
0300 if (G_atmi_tls->tpcall_get_cd > MAX_ASYNC_CALLS-1)
0301 {
0302 G_atmi_tls->tpcall_get_cd=1;
0303 }
0304
0305 if (start_cd==G_atmi_tls->tpcall_get_cd)
0306 {
0307 NDRX_LOG(log_debug, "All call descriptors overflow restart!");
0308 break;
0309 }
0310 }
0311
0312 if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[G_atmi_tls->tpcall_get_cd].status)
0313 {
0314 NDRX_LOG(log_debug, "All call descriptors have been taken - FAIL!");
0315
0316 EXFAIL_OUT(ret);
0317 }
0318 else
0319 {
0320 callseq = ndrx_get_next_callseq_shared();
0321
0322 ret = G_atmi_tls->tpcall_get_cd;
0323 *p_callseq=callseq;
0324 NDRX_LOG(log_debug, "Got free call descriptor %d, callseq: %u",
0325 ret, callseq);
0326
0327 NDRX_LOG(log_debug, "cd %d locked to %d timestamp (id: %d%d) callseq: %u tout_eff: %d",
0328 ret, timestamp, ret,timestamp, callseq, tout_eff);
0329 G_atmi_tls->G_call_state[ret].status = CALL_WAITING_FOR_ANS;
0330 G_atmi_tls->G_call_state[ret].timestamp = timestamp;
0331 G_atmi_tls->G_call_state[ret].callseq = callseq;
0332 G_atmi_tls->G_call_state[ret].flags = flags;
0333 G_atmi_tls->G_call_state[ret].tout_eff = tout_eff;
0334
0335
0336
0337
0338 if (!(flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0339 {
0340 NDRX_LOG(log_debug, "Registering cd=%d under global "
0341 "transaction!", ret);
0342 if (EXSUCCEED!=atmi_xa_cd_reg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->call_cds), ret))
0343 {
0344 EXFAIL_OUT(ret);
0345 }
0346 }
0347 }
0348 out:
0349
0350 return ret;
0351
0352 }
0353
0354
0355
0356
0357
0358
0359 exprivate void unlock_call_descriptor(int cd, short status)
0360 {
0361
0362
0363 if (!(G_atmi_tls->G_call_state[cd].flags & TPNOTRAN) &&
0364 G_atmi_tls->G_atmi_xa_curtx.txinfo)
0365 {
0366 NDRX_LOG(log_debug, "Un-registering cd=%d from global "
0367 "transaction!", cd);
0368
0369 atmi_xa_cd_unreg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->call_cds), cd);
0370 }
0371
0372 G_atmi_tls->G_call_state[cd].status = status;
0373 }
0374
0375
0376
0377
0378
0379 expublic void cancel_if_expected(tp_command_call_t *call)
0380 {
0381 ATMI_TLS_ENTRY;
0382
0383 call_descriptor_state_t *callst = &G_atmi_tls->G_call_state[call->cd];
0384 if (CALL_WAITING_FOR_ANS==callst->status &&
0385 call->timestamp==callst->timestamp &&
0386 call->callseq==callst->callseq)
0387 {
0388 NDRX_LOG(log_debug, "Reply was expected, but probably "
0389 "timeouted - cancelling!");
0390 unlock_call_descriptor(call->cd, CALL_CANCELED);
0391 }
0392 else
0393 {
0394 NDRX_LOG(log_debug, "Reply was NOT expected, ignoring!");
0395 }
0396 }
0397
0398
0399
0400
0401
0402
0403
0404
0405
0406
0407
0408
0409
0410
0411
0412
0413
0414 expublic int ndrx_tpacall (char *svc, char *data,
0415 long len, long flags, char *extradata,
0416 int dest_node, int ex_flags, TPTRANID *p_tranid,
0417 int user1, long user2, int user3, long user4,
0418 ndrx_tpcall_cache_ctl_t *p_cachectl)
0419 {
0420 int ret=EXSUCCEED;
0421 char *buf=NULL;
0422 size_t buf_len;
0423 tp_command_call_t *call;
0424 long data_len = MAX_CALL_DATA_SIZE;
0425 char send_q[NDRX_MAX_Q_SIZE+1];
0426 time_t timestamp;
0427 int is_bridge;
0428 int tpcall_cd;
0429 int have_shm = EXFALSE;
0430 int noenterr = EXFALSE;
0431 char svcddr[XATMI_SERVICE_NAME_LENGTH+1];
0432 int prio = NDRX_MSGPRIO_DEFAULT;
0433 int tout_eff;
0434 ATMI_TLS_ENTRY;
0435
0436 NDRX_LOG(log_debug, "%s enter data=%p", __func__, data);
0437
0438 tout_eff = ndrx_tptoutget_eff();
0439
0440
0441
0442
0443
0444 NDRX_STRCPY_SAFE(svcddr, svc);
0445
0446 if (EXFAIL==ndrx_ddr_grp_get(svcddr, sizeof(svcddr), data, len,
0447 &prio))
0448 {
0449
0450 EXFAIL_OUT(ret);
0451 }
0452
0453 NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0454
0455 call=(tp_command_call_t *)buf;
0456
0457 if (G_atmi_tls->G_atmi_xa_curtx.txinfo)
0458 {
0459 atmi_xa_print_knownrms(log_info, "Known RMs before call: ",
0460 G_atmi_tls->G_atmi_xa_curtx.txinfo->tmknownrms);
0461 }
0462
0463
0464
0465
0466 if (ex_flags & TPCALL_BRCALL)
0467 {
0468
0469 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0470 {
0471 int tmp_is_bridge;
0472 char tmpsvc[MAXTIDENT+1];
0473
0474 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, dest_node);
0475
0476 if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, send_q, &tmp_is_bridge, NULL))
0477 {
0478 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]",
0479 tmpsvc);
0480 EXFAIL_OUT(ret);
0481 }
0482 }
0483 #else
0484 snprintf(send_q, sizeof(send_q), NDRX_SVC_QBRDIGE,
0485 G_atmi_tls->G_atmi_conf.q_prefix, dest_node);
0486 #endif
0487 is_bridge=EXTRUE;
0488 }
0489 else if (EXSUCCEED!=ndrx_shm_get_svc(svcddr, send_q, &is_bridge, &have_shm))
0490 {
0491 NDRX_LOG(log_info, "Service is not available %s by shm", svcddr);
0492 noenterr = NOENT_ERR_SHM;
0493
0494 }
0495
0496
0497 if (!have_shm)
0498 {
0499
0500 if (!ndrx_q_exists(send_q))
0501 {
0502 noenterr = NOENT_ERR_QUEUE;
0503
0504 }
0505 }
0506
0507
0508
0509 if (!(flags & TPNOCACHELOOK) && NULL!=p_cachectl)
0510 {
0511
0512 if (EXSUCCEED!=(ret=ndrx_cache_lookup(svc, data, len,
0513 p_cachectl->odata, p_cachectl->olen, flags,
0514 &p_cachectl->should_cache,
0515 &p_cachectl->saved_tperrno,
0516 &p_cachectl->saved_tpurcode, EXFALSE, noenterr)))
0517 {
0518
0519 if (EXFAIL==ret)
0520 {
0521 EXFAIL_OUT(ret);
0522 }
0523 else
0524 {
0525
0526
0527 if (noenterr)
0528 {
0529 p_cachectl->should_cache=EXFALSE;
0530 }
0531
0532 NDRX_LOG(log_debug, "Cache lookup failed ... continue with svc call");
0533 }
0534 }
0535 else
0536 {
0537 p_cachectl->cached_rsp = EXTRUE;
0538
0539 goto out;
0540 }
0541 }
0542
0543
0544 if (noenterr)
0545 {
0546
0547
0548
0549
0550
0551
0552 if (NULL!=G_atmi_tls->pf_tpacall_noservice_hook && (flags & TPNOREPLY ))
0553 {
0554 ret=G_atmi_tls->pf_tpacall_noservice_hook(svc, data, len, flags);
0555 goto out;
0556 }
0557
0558 ndrx_TPset_error_fmt(TPENOENT, "%s: Service is not available %s by %s",
0559 __func__, svcddr, NOENT_ERR_SHM==noenterr?"shm":"queue");
0560
0561 EXFAIL_OUT(ret);
0562 }
0563
0564
0565 memset(call, 0, sizeof(tp_command_call_t));
0566
0567
0568 if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing (data, len, call->data,
0569 &data_len, flags, 0))
0570 {
0571
0572 EXFAIL_OUT(ret);
0573 }
0574
0575
0576
0577 call->data_len = data_len;
0578
0579 data_len+=sizeof(tp_command_call_t);
0580 call->clttout = tout_eff;
0581
0582 NDRX_STRCPY_SAFE(call->reply_to, G_atmi_tls->G_atmi_conf.reply_q_str);
0583
0584 call->command_id = ATMI_COMMAND_TPCALL;
0585
0586
0587 NDRX_STRCPY_SAFE(call->name, svcddr);
0588 call->flags = flags;
0589
0590 if (NULL!=extradata)
0591 {
0592 NDRX_STRCPY_SAFE(call->extradata, extradata);
0593 }
0594
0595 timestamp = time(NULL);
0596
0597
0598 if (!(call->flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0599 {
0600 NDRX_LOG(log_info, "Current process in global transaction (%s) - "
0601 "prepare call", G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0602
0603 atmi_xa_cpy_xai_to_call(call, G_atmi_tls->G_atmi_xa_curtx.txinfo);
0604
0605 if (call->flags & TPTRANSUSPEND && NULL!=p_tranid &&
0606 EXSUCCEED!=ndrx_tpsuspend(p_tranid, TPTXTMSUSPEND, EXFALSE))
0607 {
0608
0609 ndrx_TPoverride_code(TPESYSTEM);
0610 EXFAIL_OUT(ret);
0611 }
0612 }
0613
0614
0615 if (!(flags & TPNOREPLY))
0616 {
0617
0618 if (EXFAIL==(tpcall_cd = get_call_descriptor_and_lock(&call->callseq,
0619 timestamp, flags, tout_eff)))
0620 {
0621 NDRX_LOG(log_error, "Do not have resources for "
0622 "track this call!");
0623 ndrx_TPset_error_fmt(TPELIMIT, "%s:All call descriptor entries have been used "
0624 "(check why they do not free up? Maybe need to "
0625 "use tpcancel()?)", __func__);
0626 EXFAIL_OUT(ret);
0627 }
0628 }
0629 else
0630 {
0631 NDRX_LOG(log_info, "TPNOREPLY => cd=0");
0632 tpcall_cd = 0;
0633 }
0634
0635 call->cd = tpcall_cd;
0636 call->timestamp = timestamp;
0637
0638 call->rval = user1;
0639 call->rcode = user2;
0640
0641 call->user3 = user3;
0642 call->user4 = user4;
0643
0644
0645 ndrx_stopwatch_reset(&call->timer);
0646
0647 NDRX_STRCPY_SAFE(call->my_id, G_atmi_tls->G_atmi_conf.my_id);
0648 NDRX_LOG(log_debug, "Sending request to: [%s] my_id=[%s] reply_to=[%s] cd=%d "
0649 "callseq=%u (user1=%d, user2=%ld, user3=%d, user4=%ld)",
0650 send_q, call->my_id, call->reply_to, tpcall_cd, call->callseq,
0651 call->rval, call->rcode, call->user3, call->user4);
0652
0653 NDRX_DUMP(log_dump, "Sending away...", (char *)call, data_len);
0654
0655 if (EXSUCCEED!=(ret=ndrx_generic_q_send(send_q, (char *)call, data_len, flags, prio)))
0656 {
0657 int err;
0658
0659 if (ENOENT==ret)
0660 {
0661 err=TPENOENT;
0662 }
0663 else
0664 {
0665 CONV_ERROR_CODE(ret, err);
0666 }
0667 ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s", __func__, strerror(ret));
0668 ret=EXFAIL;
0669
0670
0671 unlock_call_descriptor(tpcall_cd, CALL_NOT_ISSUED);
0672
0673 goto out;
0674
0675 }
0676
0677 ret=tpcall_cd;
0678
0679 out:
0680
0681 if (NULL!=buf)
0682 {
0683 NDRX_SYSBUF_FREE(buf);
0684 }
0685
0686 NDRX_LOG(log_debug, "%s return %d", __func__, ret);
0687 return ret;
0688 }
0689
0690
0691
0692
0693
0694
0695
0696
0697 expublic int ndrx_add_to_memq(char **pbuf, size_t pbuf_len, ssize_t rply_len)
0698 {
0699 int ret = EXSUCCEED;
0700 tpmemq_t *tmp;
0701
0702 if (NULL==(tmp = NDRX_FPMALLOC(sizeof(tpmemq_t), 0)))
0703 {
0704 int err = errno;
0705 NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0706 userlog("Failed to alloc: %s", strerror(err));
0707 EXFAIL_OUT(ret);
0708 }
0709
0710 tmp->buf = *pbuf;
0711 *pbuf = NULL;
0712 tmp->len = pbuf_len;
0713 tmp->data_len = rply_len;
0714 tmp->prev = NULL;
0715 tmp->next = NULL;
0716
0717
0718 DL_APPEND(G_atmi_tls->memq, tmp);
0719
0720 out:
0721 return ret;
0722 }
0723
0724
0725
0726
0727
0728
0729
0730
0731
0732
0733
0734
0735 exprivate int ndrx_rm_frm_memq(int cd, long flags, char **pbuf, size_t *pbuf_len, ssize_t *rply_len)
0736 {
0737 int ret=EXSUCCEED;
0738 tpmemq_t *el, *elt;
0739 NDRX_LOG(log_info, "Got message from memq...");
0740
0741
0742
0743
0744 if (flags & TPGETANY)
0745 {
0746
0747
0748
0749 NDRX_SYSBUF_FREE(*pbuf);
0750 *pbuf = G_atmi_tls->memq->buf;
0751 *pbuf_len = G_atmi_tls->memq->len;
0752 *rply_len = G_atmi_tls->memq->data_len;
0753
0754
0755 el = G_atmi_tls->memq;
0756 ret=EXTRUE;
0757 }
0758 else
0759 {
0760
0761 DL_FOREACH_SAFE(G_atmi_tls->memq, el, elt)
0762 {
0763 tp_command_call_t *rply = (tp_command_call_t *)el->buf;
0764
0765 if (rply->cd==cd)
0766 {
0767 NDRX_SYSBUF_FREE(*pbuf);
0768 *pbuf = el->buf;
0769 *pbuf_len = el->len;
0770 *rply_len = el->data_len;
0771 ret=EXTRUE;
0772 break;
0773 }
0774 }
0775 }
0776
0777
0778 if (EXTRUE==ret)
0779 {
0780 DL_DELETE(G_atmi_tls->memq, el);
0781 NDRX_FPFREE(el);
0782 }
0783 out:
0784 return ret;
0785 }
0786
0787
0788
0789
0790
0791
0792
0793
0794
0795 expublic int ndrx_tpgetrply (int *cd,
0796 int cd_exp,
0797 char **data ,
0798 long *len, long flags,
0799 TPTRANID *p_tranid)
0800 {
0801 int ret=EXSUCCEED;
0802 char *pbuf = NULL;
0803 ssize_t rply_len;
0804 unsigned prio;
0805 size_t pbuf_len;
0806 tp_command_call_t *rply=NULL;
0807 int answ_ok = EXFALSE;
0808 int is_abort_only = EXFALSE;
0809 ATMI_TLS_ENTRY;
0810
0811
0812 NDRX_LOG(log_debug, "%s enter, flags %ld cd_exp %d data=%p *data=%p len=%p",
0813 __func__, flags, cd_exp, data, *data, len);
0814
0815
0816
0817
0818 if (!(flags & TPGETANY) &&
0819 CALL_WAITING_FOR_ANS!=G_atmi_tls->G_call_state[cd_exp].status)
0820 {
0821 ndrx_TPset_error_fmt(TPEBADDESC, "Call descriptor %d is %s",
0822 cd_exp,
0823 CALL_NOT_ISSUED==G_atmi_tls->G_call_state[*cd].status?"not issued":"canceled");
0824 EXFAIL_OUT(ret);
0825 }
0826
0827
0828
0829
0830 while (!answ_ok)
0831 {
0832
0833 if (NULL==pbuf)
0834 {
0835 NDRX_SYSBUF_MALLOC_WERR_OUT(pbuf, pbuf_len, ret);
0836 rply = (tp_command_call_t *)pbuf;
0837 }
0838
0839
0840
0841
0842 if (NULL!=G_atmi_tls->memq &&
0843
0844 EXTRUE==ndrx_rm_frm_memq(*cd, flags, &pbuf, &pbuf_len, &rply_len))
0845 {
0846 rply = (tp_command_call_t *)pbuf;
0847 NDRX_LOG(log_debug, "from memq: pbuf=%p", pbuf);
0848 }
0849 else
0850 {
0851 NDRX_LOG(log_info, "Waiting on OS Q mqd_t=%d...",
0852 G_atmi_tls->G_atmi_conf.reply_q);
0853
0854
0855 rply_len = ndrx_generic_q_receive(G_atmi_tls->G_atmi_conf.reply_q,
0856 G_atmi_tls->G_atmi_conf.reply_q_str,
0857 &(G_atmi_tls->G_atmi_conf.reply_q_attr),
0858 (char *)rply, pbuf_len, &prio, flags);
0859 }
0860
0861
0862
0863
0864
0865 if ((flags & TPNOBLOCK && GEN_QUEUE_ERR_NO_DATA==rply_len) ||
0866 (EXFAIL==rply_len && TPETIME==tperrno))
0867 {
0868 if (flags & TPGETANY)
0869 {
0870 if (EXSUCCEED!=(ret = call_scan_tout(EXFAIL, cd)))
0871 {
0872 goto out;
0873 }
0874 }
0875 else
0876 {
0877 if (EXSUCCEED!=(ret = call_scan_tout(cd_exp, cd)))
0878 {
0879 goto out;
0880 }
0881 }
0882 }
0883
0884 if (GEN_QUEUE_ERR_NO_DATA==rply_len)
0885 {
0886
0887
0888
0889
0890
0891 ndrx_TPset_error_msg(TPEBLOCK, "TPENOBLOCK was specified in flags and "
0892 "no message is in queue");
0893 ret=EXFAIL;
0894 goto out;
0895 }
0896 else if (EXFAIL==rply_len)
0897 {
0898
0899 NDRX_LOG(log_debug, "%s failed to receive answer", __func__);
0900 ret=EXFAIL;
0901 goto out;
0902 }
0903 else
0904 {
0905 if (ATMI_COMMAND_TPNOTIFY==rply->command_id ||
0906 ATMI_COMMAND_BROADCAST==rply->command_id)
0907 {
0908 NDRX_LOG(log_debug, "%s message received -> _tpnotify",
0909 (ATMI_COMMAND_TPNOTIFY==rply->command_id?"Notification":"Broadcast"));
0910
0911 ndrx_process_notif(pbuf, rply_len);
0912
0913
0914 continue;
0915 }
0916
0917 NDRX_LOG(log_debug, "accept any: %s, cd=%d (name: [%s], my_id: [%s]) "
0918 "atmi_tls=%p cmd=%hd rplybuf=%p rply_len=%zd",
0919 (flags & TPGETANY)?"yes":"no", rply->cd,
0920 rply->my_id, rply->name, G_atmi_tls, rply->command_id, pbuf,
0921 rply_len);
0922
0923
0924 if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[rply->cd].status &&
0925 G_atmi_tls->G_call_state[rply->cd].timestamp == rply->timestamp &&
0926 G_atmi_tls->G_call_state[rply->cd].callseq == rply->callseq)
0927 {
0928
0929
0930 if ( !(flags & TPGETANY) && rply->cd!=cd_exp)
0931 {
0932
0933 NDRX_LOG(log_warn, "Out of bound msg (for different cd): "
0934 "cd: %d, expected cd: %d timestamp: %d callseq: %u, "
0935 "reply from %s, cd status %hd - add to buffer",
0936 rply->cd, cd_exp, rply->timestamp, rply->callseq, rply->reply_to,
0937 G_atmi_tls->G_call_state[rply->cd].status);
0938
0939
0940 if (EXSUCCEED!=ndrx_add_to_memq(&pbuf, pbuf_len, rply_len))
0941 {
0942 EXFAIL_OUT(ret);
0943 }
0944
0945 continue;
0946 }
0947
0948 NDRX_LOG(log_info, "Reply cd: %d, timestamp :%d callseq: %u from "
0949 "%s - expected OK!",
0950 rply->cd, rply->timestamp, rply->callseq, rply->reply_to);
0951 answ_ok=EXTRUE;
0952
0953 unlock_call_descriptor(rply->cd, CALL_NOT_ISSUED);
0954 }
0955 else
0956 {
0957 NDRX_LOG(log_warn, "Dropping incoming message (not expected): "
0958 "cd: %d, timestamp :%d, callseq: %u reply from %s cd status %hd",
0959 rply->cd, rply->timestamp, rply->callseq, rply->reply_to,
0960 G_atmi_tls->G_call_state[rply->cd].status);
0961
0962 continue;
0963 }
0964
0965
0966 if (TMTXFLAGS_IS_ABORT_ONLY & rply->tmtxflags)
0967 {
0968 NDRX_LOG(log_warn, "Reply contains SYS_XA_ABORT_ONLY!");
0969 is_abort_only = EXTRUE;
0970 }
0971
0972
0973 *cd = rply->cd;
0974 if (rply->sysflags & SYS_FLAG_REPLY_ERROR)
0975 {
0976 ndrx_TPset_error_msg(rply->rcode, "Server failed to generate reply");
0977 ret=EXFAIL;
0978 goto out;
0979 }
0980 else
0981 {
0982
0983 ret = ndrx_mbuf_prepare_incoming (rply->data, rply->data_len,
0984 data, len, flags, 0);
0985
0986
0987 G_atmi_tls->M_svc_return_code = rply->rcode;
0988
0989
0990 if (ret==EXFAIL)
0991 {
0992 goto out;
0993 }
0994
0995
0996
0997 if (TPSUCCESS!=rply->rval)
0998 {
0999 ndrx_TPset_error_fmt(TPESVCFAIL, "Service returned %d",
1000 rply->rval);
1001 ret=EXFAIL;
1002 goto out;
1003 }
1004 }
1005 }
1006 }
1007 out:
1008
1009
1010 if (flags & TPTRANSUSPEND && p_tranid && p_tranid->tmxid[0])
1011 {
1012
1013
1014
1015 atmi_error_t err;
1016 int err_saved = EXFALSE;
1017
1018 if (0!=tperrno)
1019 {
1020 ndrx_TPsave_error(&err);
1021 err_saved = EXTRUE;
1022 }
1023
1024
1025 if (EXSUCCEED!=ndrx_tpresume(p_tranid, TPTXTMSUSPEND) && EXSUCCEED==ret)
1026 {
1027 ret=EXFAIL;
1028 }
1029
1030
1031
1032 if (err_saved)
1033 {
1034 ndrx_TPrestore_error(&err);
1035 }
1036 else if (EXFAIL==ret)
1037 {
1038
1039 ndrx_TPoverride_code(TPESYSTEM);
1040 }
1041
1042 }
1043
1044 if (G_atmi_tls->G_atmi_xa_curtx.txinfo && NULL!=rply
1045 && 0==strcmp(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid, rply->tmxid)
1046 && EXSUCCEED!=atmi_xa_update_known_rms(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmknownrms,
1047 rply->tmknownrms))
1048 {
1049
1050 NDRX_LOG(log_error, "Failed to atmi_xa_update_known_rms()");
1051 ndrx_TPoverride_code(TPESYSTEM);
1052 ret = EXFAIL;
1053 }
1054
1055
1056 NDRX_ABORT_START(is_abort_only)
1057 NDRX_ABORT_END(is_abort_only)
1058
1059
1060 if (NULL!=pbuf)
1061 {
1062 NDRX_SYSBUF_FREE(pbuf);
1063 }
1064
1065 NDRX_LOG(log_debug, "%s return %d tpurcode=%ld tperror=%d data=%p *data=%p len=%p *len=%ld",
1066 __func__, ret, G_atmi_tls->M_svc_return_code, G_atmi_tls->M_atmi_error,
1067 data, *data, len, *len);
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083 return ret;
1084 }
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102 expublic int ndrx_tpcall (char *svc, char *idata, long ilen,
1103 char **odata, long *olen, long flags,
1104 char *extradata, int dest_node, int ex_flags,
1105 int user1, long user2, int user3, long user4)
1106 {
1107 int ret=EXSUCCEED;
1108 int cd_req = 0;
1109 int cd_rply = 0;
1110 ndrx_tpcall_cache_ctl_t cachectl;
1111 int cache_used = EXFALSE;
1112 TPTRANID tranid, *p_tranid;
1113
1114 NDRX_LOG(log_debug, "%s: enter flags=%ld tx=%p xa_flags_sys=%ld "
1115 "idata=%p ilen=%ld odata=%p *odata=%p olen=%p",
1116 __func__, flags, G_atmi_tls->G_atmi_xa_curtx.txinfo,
1117 G_atmi_env.xa_flags_sys, idata, ilen, odata, *odata, olen);
1118
1119 cachectl.should_cache = EXFALSE;
1120 cachectl.cached_rsp = EXFALSE;
1121
1122
1123 if ( !(flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo &&
1124 (
1125
1126 (flags & TPTRANSUSPEND) ||
1127
1128 (
1129 !(G_atmi_env.xa_flags_sys & NDRX_XA_FLAG_SYS_NOJOIN)
1130 &&
1131 !(G_atmi_env.xa_flags_sys & NDRX_XA_FLAG_SYS_NOSUSPEND)
1132 )
1133 )
1134 )
1135 {
1136
1137 flags|=TPTRANSUSPEND;
1138 memset(&tranid, 0, sizeof(tranid));
1139 p_tranid = &tranid;
1140 }
1141 else
1142 {
1143 p_tranid = NULL;
1144 }
1145
1146 if (ndrx_cache_used())
1147 {
1148 cache_used = EXTRUE;
1149 memset(&cachectl, 0, sizeof(cachectl));
1150
1151 cachectl.odata = odata;
1152 cachectl.olen = olen;
1153 }
1154
1155
1156 if (EXFAIL==(cd_req=ndrx_tpacall (svc, idata, ilen, flags, extradata,
1157 dest_node, ex_flags, p_tranid, user1, user2, user3, user4,
1158 (cache_used?&cachectl:NULL) )))
1159 {
1160 NDRX_LOG(log_error, "_tpacall to %s failed", svc);
1161 ret=EXFAIL;
1162 goto out;
1163 }
1164
1165 if (cachectl.cached_rsp)
1166 {
1167 NDRX_LOG(log_debug, "Reply from cache");
1168
1169 NDRX_LOG(log_info, "Response read form cache!");
1170 G_atmi_tls->M_svc_return_code = cachectl.saved_tpurcode;
1171
1172 if (0!=cachectl.saved_tperrno)
1173 {
1174 ndrx_TPset_error_msg(cachectl.saved_tperrno, "Cached error response");
1175 ret=EXFAIL;
1176 }
1177
1178
1179 goto out;
1180 }
1181
1182
1183
1184
1185 flags&=~TPNOBLOCK;
1186
1187
1188
1189 if (!(flags & TPNOREPLY))
1190 {
1191 if (EXSUCCEED!=(ret=ndrx_tpgetrply(&cd_rply, cd_req, odata, olen, flags,
1192 p_tranid)))
1193 {
1194 NDRX_LOG(log_error, "_tpgetrply to %s failed", svc);
1195 goto out;
1196 }
1197
1198
1199
1200
1201 if (cd_req!=cd_rply)
1202 {
1203 ret=EXFAIL;
1204 ndrx_TPset_error_fmt(TPEPROTO, "%s: Got invalid reply! cd_req: %d, cd_rply: %d",
1205 __func__, cd_req, cd_rply);
1206 goto out;
1207 }
1208 }
1209
1210 out:
1211
1212
1213 if (EXSUCCEED!=ret && TPETIME==tperrno)
1214 {
1215 ndrx_tpcancel(cd_req);
1216 }
1217
1218
1219 if (!(flags & TPNOCACHEADD) && cachectl.should_cache && !cachectl.cached_rsp)
1220 {
1221 int ret2;
1222
1223
1224 if (EXSUCCEED!=(ret2=ndrx_cache_save (svc, *odata,
1225 *olen, tperrno, G_atmi_tls->M_svc_return_code,
1226 G_atmi_env.our_nodeid, flags, EXFAIL, EXFAIL, EXFALSE)))
1227 {
1228
1229 if (NDRX_TPCACHE_ENOCACHE!=ret2)
1230 {
1231 userlog("Failed to save service [%s] cache results: %s", svc,
1232 tpstrerror(tperrno));
1233 }
1234 }
1235 }
1236
1237 NDRX_LOG(log_debug, "%s: return %d cd %d odata=%p *odata=%p olen=%p *olen=%ld",
1238 __func__, ret, cd_rply, odata, *odata, olen, *olen);
1239 return ret;
1240 }
1241
1242
1243
1244
1245
1246
1247 expublic int ndrx_tpcancel (int cd)
1248 {
1249 int ret=EXSUCCEED;
1250 tpmemq_t *el, *elt;
1251 char *data = NULL;
1252 long len;
1253 ATMI_TLS_ENTRY;
1254
1255 NDRX_LOG(log_debug, "tpcancel issued for %d", cd);
1256
1257 if (cd<1||cd>=MAX_ASYNC_CALLS)
1258 {
1259 ndrx_TPset_error_fmt(TPEBADDESC, "%s: Invalid call descriptor %d, should be 0<cd<%d",
1260 __func__, cd, MAX_ASYNC_CALLS);
1261 ret=EXFAIL;
1262 goto out;
1263 }
1264
1265
1266
1267
1268
1269
1270
1271 ndrx_tpgetrply(&cd, cd, &data, &len, TPNOBLOCK|TPNOABORT, NULL);
1272 if (NULL!=data)
1273 {
1274 tpfree(data);
1275 }
1276
1277
1278 DL_FOREACH_SAFE(G_atmi_tls->memq, el, elt)
1279 {
1280 tp_command_call_t *rply = (tp_command_call_t *)el->buf;
1281 if (rply->cd==cd)
1282 {
1283 NDRX_SYSBUF_FREE(el->buf);
1284 NDRX_FPFREE(el);
1285 }
1286 }
1287
1288
1289 G_atmi_tls->G_call_state[cd].status = CALL_CANCELED;
1290
1291 out:
1292 return ret;
1293 }
1294
1295
1296
1297
1298
1299
1300 expublic long * _exget_tpurcode_addr (void)
1301 {
1302 ATMI_TLS_ENTRY;
1303 return &G_atmi_tls->M_svc_return_code;
1304 }
1305
1306
1307