Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief ATMI tpcall function implementation (Common version)
0003  *   This will have to make the call and wait back for answer.
0004  *   This should pass to function file descriptor for reply back queue.
0005  *
0006  * @file tpcall.c
0007  */
0008 /* -----------------------------------------------------------------------------
0009  * Enduro/X Middleware Platform for Distributed Transaction Processing
0010  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0011  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0012  * This software is released under one of the following licenses:
0013  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0014  * See LICENSE file for full text.
0015  * -----------------------------------------------------------------------------
0016  * AGPL license:
0017  *
0018  * This program is free software; you can redistribute it and/or modify it under
0019  * the terms of the GNU Affero General Public License, version 3 as published
0020  * by the Free Software Foundation;
0021  *
0022  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0023  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0024  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0025  * for more details.
0026  *
0027  * You should have received a copy of the GNU Affero General Public License along 
0028  * with this program; if not, write to the Free Software Foundation, Inc.,
0029  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0030  *
0031  * -----------------------------------------------------------------------------
0032  * A commercial use license is available from Mavimax, Ltd
0033  * contact@mavimax.com
0034  * -----------------------------------------------------------------------------
0035  */
0036 
0037 /*---------------------------Includes-----------------------------------*/
0038 #include <stdio.h>
0039 #include <stdarg.h>
0040 #include <memory.h>
0041 #include <stdlib.h>
0042 #include <errno.h>
0043 #include <sys_mqueue.h>
0044 #include <fcntl.h>
0045 
0046 #include <atmi.h>
0047 #include <userlog.h>
0048 #include <ndebug.h>
0049 #include <tperror.h>
0050 #include <typed_buf.h>
0051 #include <atmi_int.h>
0052 
0053 #include "../libatmisrv/srv_int.h"
0054 /* #include "ndrxd.h" why? */
0055 #include "utlist.h"
0056 #include <thlock.h>
0057 #include <xa_cmn.h>
0058 #include <atmi_shm.h>
0059 #include <atmi_tls.h>
0060 #include <atmi_cache.h>
0061 #include <ndrx_ddr.h>
0062 /*---------------------------Externs------------------------------------*/
0063 /*---------------------------Macros-------------------------------------*/
0064 #define NOENT_ERR_SHM       1   /**< Service is not available from SHM  */
0065 #define NOENT_ERR_QUEUE     2   /**< Service is not available from Q    */
0066 /*---------------------------Enums--------------------------------------*/
0067 /*---------------------------Typedefs-----------------------------------*/
0068 /*---------------------------Globals------------------------------------*/
0069 /*---------------------------Statics------------------------------------*/
0070 exprivate NDRX_SPIN_LOCKDECL(M_callseq_lock);
0071 
0072 /*---------------------------Prototypes---------------------------------*/
0073 exprivate void unlock_call_descriptor(int cd, short status);
0074 
0075 /**
0076  * Internal init function, once at app start
0077  */
0078 expublic int ndrx_tpcall_init_once(void)
0079 {
0080     NDRX_SPIN_INIT_V(M_callseq_lock);
0081     return EXSUCCEED;
0082 }
0083 
0084 /**
0085  * Dump to the log Command call buffer
0086  * @param lev Debug level
0087  * @param call call struct addr
0088  */
0089 expublic void ndrx_dump_call_struct(int lev, tp_command_call_t *call)
0090 {
0091     ndrx_debug_t * dbg = debug_get_ndrx_ptr();
0092     if (dbg->level>=lev)
0093     {
0094         NDRX_LOG(lev, "=== Start of tp_command_call_t call dump, ptr=%p ===", call);
0095         NDRX_LOG(lev, "command_id=[%hd]", call->command_id);
0096         NDRX_LOG(lev, "proto_ver=[%c%c%c%c]", call->proto_ver[0], call->proto_ver[1],
0097             call->proto_ver[2], call->proto_ver[3]);
0098         NDRX_LOG(lev, "proto_magic=[%d]", call->proto_magic);
0099         NDRX_LOG(lev, "name=[%s]", call->name);
0100         NDRX_LOG(lev, "reply_to=[%s]", call->reply_to);
0101         NDRX_LOG(lev, "callstack=[%s]", call->callstack);
0102         NDRX_LOG(lev, "my_id=[%s]", call->my_id);
0103         NDRX_LOG(lev, "sysflags=[%p]", call->sysflags);
0104         NDRX_LOG(lev, "cd=[%d]", call->cd);
0105         NDRX_LOG(lev, "rval=[%d]", call->rval);
0106         NDRX_LOG(lev, "rcode=[%ld]", call->rcode);
0107         NDRX_LOG(lev, "extradata=[%s]", call->extradata);
0108         NDRX_LOG(lev, "flags=[%p]", call->flags);
0109         NDRX_LOG(lev, "timestamp=[%lu]", call->timestamp);
0110         NDRX_LOG(lev, "callseq=[%u]", call->callseq);
0111         NDRX_LOG(lev, "timer.tv_nsec=[%lu]", call->timer.t.tv_nsec);
0112         NDRX_LOG(lev, "timer.tv_sec=[%lu]", call->timer.t.tv_sec);
0113         NDRX_LOG(lev, "tmtxflags=[0x%x]", call->tmtxflags);
0114         NDRX_LOG(lev, "tmxid=[%s]", call->tmxid);
0115         NDRX_LOG(lev, "tmrmid=[%hd]", call->tmrmid);
0116         NDRX_LOG(lev, "tmnodeid=[%hd]", call->tmnodeid);
0117         NDRX_LOG(lev, "tmsrvid=[%hd]", call->tmsrvid);
0118         NDRX_LOG(lev, "tmknownrms=[%s]", call->tmknownrms);
0119         NDRX_LOG(lev, "data_len=[%ld]", call->data_len);
0120         NDRX_LOG(lev, "===== End of tp_command_call_t call dump, ptr=%p ===", call);
0121     }
0122 }
0123 
0124 /**
0125  * Process individual call in progress.
0126  * @param cd
0127  * @return 
0128  */
0129 exprivate int call_check_tout(int cd)
0130 {
0131     int ret=EXSUCCEED;
0132     time_t t;
0133     int t_diff;
0134     /* ATMI_TLS_ENTRY; - already called by parent processes - already called by parent processes  */
0135     
0136     if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[cd].status &&
0137             !(G_atmi_tls->G_call_state[cd].flags & TPNOTIME) &&
0138               (t_diff = ((t = time(NULL)) - G_atmi_tls->G_call_state[cd].timestamp)) > G_atmi_tls->G_call_state[cd].tout_eff
0139             )
0140     {
0141         /* added some more debug info, because we have strange timeouts... */
0142         NDRX_LOG(log_warn, "cd %d (callseq %u) timeout condition - generating error "
0143                 "(locked at: %ld current tstamp: %ld, diff: %d, timeout_value: %d)", 
0144                 cd, G_atmi_tls->G_call_state[cd].callseq, 
0145                 G_atmi_tls->G_call_state[cd].timestamp, t, t_diff, G_atmi_tls->G_call_state[cd].tout_eff);
0146         
0147         ndrx_TPset_error_fmt(TPETIME, "cd %d (callseq %u) timeout condition - generating error "
0148                 "(locked at: %ld current tstamp: %ld, diff: %d, timeout_value: %d)", 
0149                 cd, G_atmi_tls->G_call_state[cd].callseq, 
0150                 G_atmi_tls->G_call_state[cd].timestamp, t, t_diff, G_atmi_tls->G_call_state[cd].tout_eff);
0151         
0152         /* mark cd as free (will mark as cancelled) */
0153         unlock_call_descriptor(cd, CALL_CANCELED);
0154         
0155         ret=EXFAIL;
0156         goto out;
0157     }
0158 out:
0159     return ret;
0160 }
0161 
0162 /**
0163  * Function for extra debug.
0164  */
0165 exprivate void call_dump_descriptors(void)
0166 {
0167     int i;
0168     time_t t = time(NULL);
0169     int t_diff;
0170     int cnt=0;
0171     ATMI_TLS_ENTRY;
0172     
0173     NDRX_LOG(log_debug, "***List of call descriptors waiting for answer***");
0174     NDRX_LOG(log_debug, "timeout(system wide): %d curr_tstamp (sys-wide): %ld", 
0175                             G_atmi_env.time_out, t);
0176     NDRX_LOG(log_debug, "cd\tcallseq\tlocked_at\tdiff\tout_eff");
0177         
0178     for (i=1; i<MAX_ASYNC_CALLS; i++)
0179     {
0180         if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[i].status)
0181         {
0182             t_diff = t - G_atmi_tls->G_call_state[i].timestamp;
0183             NDRX_LOG(log_debug, "%d\t%u\t%ld\t%d\t%d", 
0184                     i, G_atmi_tls->G_call_state[i].callseq, 
0185                     G_atmi_tls->G_call_state[i].timestamp, t_diff, 
0186                     G_atmi_tls->G_call_state[i].tout_eff);
0187             cnt++;
0188         }
0189     }
0190     
0191     NDRX_LOG(log_warn, "cds waiting for answer: %d", cnt);
0192     NDRX_LOG(log_debug, "*************************************************");
0193 }
0194 
0195 #define CALL_TOUT_DEBUG
0196 
0197 /**
0198  * We should check all call descriptors and imitate time-out condition,
0199  * if any of calls are timed-out
0200  * 
0201  * TODO: hash vs iterate...
0202  * @param cd - if > 0 the
0203  * @param cd_out - return failed cd...
0204  * @return 
0205  */
0206 exprivate int call_scan_tout(int cd, int *cd_out)
0207 {
0208     /* for performance reasons shouldn't we keep in HASH list 
0209      * the list of open call descriptors, iterate them and check for
0210      * time-out condition
0211      */
0212     int ret = EXSUCCEED;
0213     int i;
0214     long delta = 0;
0215     /* ATMI_TLS_ENTRY; - already called by parent*/
0216     /* NOTE: no need for mutexes, as we have call descriptors per TLS */
0217     
0218 #ifdef CALL_TOUT_DEBUG
0219     call_dump_descriptors();
0220 #endif
0221     
0222     /* Check that it is time for scan... */
0223     if (G_atmi_tls->tpcall_first || 
0224             (delta=ndrx_stopwatch_get_delta(&G_atmi_tls->tpcall_start)) >=1000 || 
0225                 /* incase of overflow: */
0226                 delta < 0)
0227     {
0228         /* we should scan the stuff. */
0229         if (0 < cd)
0230         {
0231             if (EXSUCCEED!=call_check_tout(cd))
0232             {
0233                 *cd_out = cd;
0234                 ret=EXFAIL;
0235                 goto out;
0236             }
0237         }
0238         else
0239         {
0240             for (i=1; i<MAX_ASYNC_CALLS; i++)
0241             {
0242                 if (EXSUCCEED!=call_check_tout(i))
0243                 {
0244                     *cd_out = i;
0245                     ret=EXFAIL;
0246                     goto out;
0247                 }
0248             }
0249         }
0250         /* if all ok, schedule after 1 sec. */
0251         ndrx_stopwatch_reset(&G_atmi_tls->tpcall_start);
0252         G_atmi_tls->tpcall_first = EXFALSE; /* only when all ok... */
0253     } /* if check time... */
0254     
0255 out:
0256 
0257     return ret;
0258 }
0259 
0260 /**
0261  * Return next call sequence number
0262  * The number is shared between conversational and standard call
0263  * @param p_callseq ptr to return next number into
0264  * @return 
0265  */
0266 expublic unsigned ndrx_get_next_callseq_shared(void)
0267 {
0268     static volatile unsigned shared_callseq=0;
0269     unsigned ret;
0270     /* Bug #756: */
0271     NDRX_SPIN_LOCK_V(M_callseq_lock);
0272     shared_callseq++;
0273     ret = shared_callseq;
0274     NDRX_SPIN_UNLOCK_V(M_callseq_lock);
0275     
0276     return ret;
0277 }
0278 
0279 /**
0280  * Returns free call descriptor
0281  * @param tout_eff effective timeout
0282  * @return >0 (ok), -1 = FAIL
0283  */
0284 exprivate int get_call_descriptor_and_lock(unsigned *p_callseq,
0285         time_t timestamp, long flags, int tout_eff)
0286 {
0287     int start_cd = G_atmi_tls->tpcall_get_cd; /* mark where we began */
0288     int ret = EXFAIL;
0289     unsigned callseq=0;
0290     
0291     /* ATMI_TLS_ENTRY; - already got from caller */
0292     /* Lock the call descriptor giver...! So that we have common CDs 
0293      * over the hreads
0294      */
0295     
0296     while (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[G_atmi_tls->tpcall_get_cd].status)
0297     {
0298         G_atmi_tls->tpcall_get_cd++;
0299 
0300         if (G_atmi_tls->tpcall_get_cd > MAX_ASYNC_CALLS-1)
0301         {
0302             G_atmi_tls->tpcall_get_cd=1; /* TODO: Maybe start with 0? */
0303         }
0304 
0305         if (start_cd==G_atmi_tls->tpcall_get_cd)
0306         {
0307             NDRX_LOG(log_debug, "All call descriptors overflow restart!");
0308             break;
0309         }
0310     }
0311 
0312     if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[G_atmi_tls->tpcall_get_cd].status)
0313     {
0314         NDRX_LOG(log_debug, "All call descriptors have been taken - FAIL!");
0315         /* MUTEX_UNLOCK_V(M_cd_lock); */
0316         EXFAIL_OUT(ret);
0317     }
0318     else
0319     {
0320         callseq = ndrx_get_next_callseq_shared();
0321         
0322         ret = G_atmi_tls->tpcall_get_cd;
0323         *p_callseq=callseq;
0324         NDRX_LOG(log_debug, "Got free call descriptor %d, callseq: %u",
0325                                             ret, callseq);
0326 
0327         NDRX_LOG(log_debug, "cd %d locked to %d timestamp (id: %d%d) callseq: %u tout_eff: %d",
0328                                         ret, timestamp, ret,timestamp, callseq, tout_eff);
0329         G_atmi_tls->G_call_state[ret].status = CALL_WAITING_FOR_ANS;
0330         G_atmi_tls->G_call_state[ret].timestamp = timestamp;
0331         G_atmi_tls->G_call_state[ret].callseq = callseq;
0332         G_atmi_tls->G_call_state[ret].flags = flags;
0333         G_atmi_tls->G_call_state[ret].tout_eff = tout_eff;
0334 
0335         /* MUTEX_UNLOCK_V(M_cd_lock); */
0336             
0337         /* If we have global tx open, then register cd under it! */
0338         if (!(flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0339         {
0340             NDRX_LOG(log_debug, "Registering cd=%d under global "
0341                     "transaction!", ret);
0342             if (EXSUCCEED!=atmi_xa_cd_reg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->call_cds), ret))
0343             {
0344                 EXFAIL_OUT(ret);
0345             }
0346         }
0347     }
0348 out:
0349     
0350     return ret;
0351 
0352 }
0353 
0354 /**
0355  * Unlock call descriptor
0356  * @param cd
0357  * @return
0358  */
0359 exprivate void unlock_call_descriptor(int cd, short status)
0360 {
0361     /* ATMI_TLS_ENTRY; - already done by caller */
0362     
0363     if (!(G_atmi_tls->G_call_state[cd].flags & TPNOTRAN) && 
0364             G_atmi_tls->G_atmi_xa_curtx.txinfo)
0365     {
0366         NDRX_LOG(log_debug, "Un-registering cd=%d from global "
0367                 "transaction!", cd);
0368         
0369         atmi_xa_cd_unreg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->call_cds), cd);
0370     }
0371     
0372     G_atmi_tls->G_call_state[cd].status = status;
0373 }
0374 
0375 /**
0376  * Cancel the call descriptor if was expected
0377  * @param call
0378  */
0379 expublic void cancel_if_expected(tp_command_call_t *call)
0380 {
0381     ATMI_TLS_ENTRY;
0382     
0383     call_descriptor_state_t *callst  = &G_atmi_tls->G_call_state[call->cd];
0384     if (CALL_WAITING_FOR_ANS==callst->status &&
0385             call->timestamp==callst->timestamp &&
0386             call->callseq==callst->callseq)
0387     {
0388         NDRX_LOG(log_debug, "Reply was expected, but probably "
0389                                         "timeouted - cancelling!");
0390         unlock_call_descriptor(call->cd, CALL_CANCELED);
0391     }
0392     else
0393     {
0394         NDRX_LOG(log_debug, "Reply was NOT expected, ignoring!");
0395     }
0396 }
0397 /**
0398  * Do asynchronous call.
0399  * If it is evpost (is_evpost) htne extradata must be loaded with `my_id' from then
0400  * server. We will parse it out get the target queue message must be put!
0401  * 
0402  * @param svc
0403  * @param data
0404  * @param len
0405  * @param flags - should be managed from parent function (is it real async call
0406  *                  or tpcall wrapper)
0407  * @param user1 - user data field 1
0408  * @param user2 - user data field 2
0409  * @param user3 - user data field 3
0410  * @param user4 - user data field 4
0411  * @param p_cachectl cache control - if cache used must be not NULL, else NULL
0412  * @return call descriptor
0413  */
0414 expublic int ndrx_tpacall (char *svc, char *data,
0415                 long len, long flags, char *extradata, 
0416                 int dest_node, int ex_flags, TPTRANID *p_tranid, 
0417                 int user1, long user2, int user3, long user4, 
0418                 ndrx_tpcall_cache_ctl_t *p_cachectl)
0419 {
0420     int ret=EXSUCCEED;
0421     char *buf=NULL;
0422     size_t buf_len;
0423     tp_command_call_t *call;
0424     long data_len = MAX_CALL_DATA_SIZE;
0425     char send_q[NDRX_MAX_Q_SIZE+1];
0426     time_t timestamp;
0427     int is_bridge;
0428     int tpcall_cd;
0429     int have_shm = EXFALSE;
0430     int noenterr = EXFALSE;
0431     char svcddr[XATMI_SERVICE_NAME_LENGTH+1]; /**< routed service name */
0432     int prio = NDRX_MSGPRIO_DEFAULT;
0433     int tout_eff;
0434     ATMI_TLS_ENTRY;
0435     
0436     NDRX_LOG(log_debug, "%s enter data=%p", __func__, data);
0437     
0438     tout_eff = ndrx_tptoutget_eff();
0439     
0440     /* if we have SHM then check the DDR options we have 
0441      * TODO: Think maybe duplicate shm attached check can be
0442      * removed.
0443      */
0444     NDRX_STRCPY_SAFE(svcddr, svc);
0445     /* try the DDR */
0446     if (EXFAIL==ndrx_ddr_grp_get(svcddr, sizeof(svcddr), data, len,
0447         &prio))
0448     {
0449         /* error shall be set */
0450         EXFAIL_OUT(ret);
0451     }
0452 
0453     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0454     
0455     call=(tp_command_call_t *)buf;
0456     
0457     if (G_atmi_tls->G_atmi_xa_curtx.txinfo)
0458     {
0459         atmi_xa_print_knownrms(log_info, "Known RMs before call: ",
0460                     G_atmi_tls->G_atmi_xa_curtx.txinfo->tmknownrms);
0461     }
0462     
0463     /* Check service availability by SHM? 
0464      * TODO: We might want to check the flags, the service might be marked for shutdown!
0465      */
0466     if (ex_flags & TPCALL_BRCALL)
0467     {
0468         /* If this is a bridge call, then format the bridge Q */
0469 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0470         {
0471             int tmp_is_bridge;
0472             char tmpsvc[MAXTIDENT+1];
0473             
0474             snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, dest_node);
0475 
0476             if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, send_q, &tmp_is_bridge, NULL))
0477             {
0478                 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]", 
0479                         tmpsvc);
0480                 EXFAIL_OUT(ret);
0481             }
0482         }
0483 #else
0484         snprintf(send_q, sizeof(send_q), NDRX_SVC_QBRDIGE, 
0485                 G_atmi_tls->G_atmi_conf.q_prefix, dest_node);
0486 #endif
0487         is_bridge=EXTRUE;
0488     }
0489     else if (EXSUCCEED!=ndrx_shm_get_svc(svcddr, send_q, &is_bridge, &have_shm))
0490     {
0491         NDRX_LOG(log_info, "Service is not available %s by shm", svcddr);
0492         noenterr = NOENT_ERR_SHM;
0493         /* goto out; */
0494     }
0495     
0496     /* In case of non shared memory mode, check that queue file exists! */
0497     if (!have_shm)
0498     {
0499         /* test queue */
0500         if (!ndrx_q_exists(send_q))
0501         {
0502             noenterr = NOENT_ERR_QUEUE;
0503             /*EXFAIL_OUT(ret); */
0504         }
0505     }
0506     
0507     /* ok service is found we can process cache here */
0508     
0509     if (!(flags & TPNOCACHELOOK) && NULL!=p_cachectl)
0510     {
0511         /* one cache for all groups */
0512         if (EXSUCCEED!=(ret=ndrx_cache_lookup(svc, data, len, 
0513                 p_cachectl->odata, p_cachectl->olen, flags, 
0514                 &p_cachectl->should_cache, 
0515                 &p_cachectl->saved_tperrno, 
0516                 &p_cachectl->saved_tpurcode, EXFALSE, noenterr)))
0517         {
0518             /* failed to get cache data */
0519             if (EXFAIL==ret)
0520             {
0521                 EXFAIL_OUT(ret);
0522             }
0523             else
0524             {
0525 
0526                 /* ignore the error (probably data not found) */
0527                 if (noenterr)
0528                 {
0529                     p_cachectl->should_cache=EXFALSE;
0530                 }
0531 
0532                 NDRX_LOG(log_debug, "Cache lookup failed ... continue with svc call");
0533             }
0534         }
0535         else
0536         {
0537             p_cachectl->cached_rsp = EXTRUE;
0538             /* data from cache, return... */
0539             goto out;
0540         }
0541     }
0542     
0543     /* generate eror */
0544     if (noenterr)
0545     {
0546         /* add tls hook for callback of non existent service, 
0547          * same as tpacall flags. Only for no reply..
0548          * Need for tpsvrinit is doing tpacalls when services are still
0549          * not advertised. Thus server can enqueue messages to send
0550          * when the service queues are open
0551          */
0552         if (NULL!=G_atmi_tls->pf_tpacall_noservice_hook && (flags & TPNOREPLY ))
0553         {
0554             ret=G_atmi_tls->pf_tpacall_noservice_hook(svc, data, len, flags);
0555             goto out;/*<<<< we are done.*/
0556         }
0557         
0558         ndrx_TPset_error_fmt(TPENOENT, "%s: Service is not available %s by %s", 
0559             __func__, svcddr, NOENT_ERR_SHM==noenterr?"shm":"queue");
0560         
0561         EXFAIL_OUT(ret);
0562     }
0563     
0564     /* Might want to remove in future... but it might be dangerous!*/
0565     memset(call, 0, sizeof(tp_command_call_t));
0566      
0567         
0568     if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing (data, len, call->data, 
0569             &data_len, flags, 0))
0570     {
0571         /* not good - error should be already set */
0572         EXFAIL_OUT(ret);
0573     }
0574 
0575 
0576     /* OK, now fill up the details */
0577     call->data_len = data_len;
0578     
0579     data_len+=sizeof(tp_command_call_t);
0580     call->clttout = tout_eff;
0581 
0582     NDRX_STRCPY_SAFE(call->reply_to, G_atmi_tls->G_atmi_conf.reply_q_str);
0583     
0584     call->command_id = ATMI_COMMAND_TPCALL;
0585     
0586     
0587     NDRX_STRCPY_SAFE(call->name, svcddr);
0588     call->flags = flags;
0589     
0590     if (NULL!=extradata)
0591     {
0592         NDRX_STRCPY_SAFE(call->extradata, extradata);
0593     }
0594 
0595     timestamp = time(NULL);
0596     
0597     /* Add global transaction info to call (if needed & tx available) */
0598     if (!(call->flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0599     {
0600         NDRX_LOG(log_info, "Current process in global transaction (%s) - "
0601                 "prepare call", G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0602         
0603         atmi_xa_cpy_xai_to_call(call, G_atmi_tls->G_atmi_xa_curtx.txinfo);
0604         
0605         if (call->flags & TPTRANSUSPEND && NULL!=p_tranid &&
0606                 EXSUCCEED!=ndrx_tpsuspend(p_tranid, TPTXTMSUSPEND, EXFALSE))
0607         {
0608             /* Override the error to TPESYSTEM */
0609             ndrx_TPoverride_code(TPESYSTEM);
0610             EXFAIL_OUT(ret);
0611         }
0612     }
0613     
0614     /* lock call descriptor */
0615     if (!(flags & TPNOREPLY))
0616     {
0617         /* get the call descriptor */
0618         if (EXFAIL==(tpcall_cd = get_call_descriptor_and_lock(&call->callseq, 
0619                 timestamp, flags, tout_eff)))
0620         {
0621             NDRX_LOG(log_error, "Do not have resources for "
0622                                 "track this call!");
0623             ndrx_TPset_error_fmt(TPELIMIT, "%s:All call descriptor entries have been used "
0624                                 "(check why they do not free up? Maybe need to "
0625                                 "use tpcancel()?)", __func__);
0626             EXFAIL_OUT(ret);
0627         }
0628     }
0629     else
0630     {
0631         NDRX_LOG(log_info, "TPNOREPLY => cd=0");
0632         tpcall_cd = 0;
0633     }
0634     
0635     call->cd = tpcall_cd;
0636     call->timestamp = timestamp;
0637     
0638     call->rval = user1;
0639     call->rcode = user2;
0640     
0641     call->user3 = user3;
0642     call->user4 = user4;
0643     
0644     /* Reset call timer */
0645     ndrx_stopwatch_reset(&call->timer);
0646     
0647     NDRX_STRCPY_SAFE(call->my_id, G_atmi_tls->G_atmi_conf.my_id); /* Setup my_id */
0648     NDRX_LOG(log_debug, "Sending request to: [%s] my_id=[%s] reply_to=[%s] cd=%d "
0649             "callseq=%u (user1=%d, user2=%ld, user3=%d, user4=%ld)", 
0650             send_q, call->my_id, call->reply_to, tpcall_cd, call->callseq,
0651             call->rval, call->rcode, call->user3, call->user4);
0652     
0653     NDRX_DUMP(log_dump, "Sending away...", (char *)call, data_len);
0654 
0655     if (EXSUCCEED!=(ret=ndrx_generic_q_send(send_q, (char *)call, data_len, flags, prio)))
0656     {
0657         int err;
0658 
0659         if (ENOENT==ret)
0660         {
0661             err=TPENOENT;
0662         }
0663         else
0664         {
0665             CONV_ERROR_CODE(ret, err);
0666         }
0667         ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s", __func__, strerror(ret));
0668         ret=EXFAIL;
0669 
0670         /* unlock call descriptor */
0671         unlock_call_descriptor(tpcall_cd, CALL_NOT_ISSUED);
0672         
0673         goto out;
0674 
0675     }
0676     /* return call descriptor */
0677     ret=tpcall_cd;
0678 
0679 out:
0680                 
0681     if (NULL!=buf)
0682     {
0683         NDRX_SYSBUF_FREE(buf);
0684     }
0685 
0686     NDRX_LOG(log_debug, "%s return %d", __func__, ret);
0687     return ret;
0688 }
0689 
0690 /**
0691  * Add message to buffer
0692  * @param pbuf double ptr to sysbuf
0693  * @param pbuf_len sysbuf len
0694  * @param rply_len len size got from q
0695  * @return EXSUCCEED/EXFAIL
0696  */
0697 expublic int ndrx_add_to_memq(char **pbuf, size_t pbuf_len, ssize_t rply_len)
0698 {
0699     int ret = EXSUCCEED;
0700     tpmemq_t *tmp;
0701     
0702     if (NULL==(tmp = NDRX_FPMALLOC(sizeof(tpmemq_t), 0)))
0703     {
0704         int err = errno;
0705         NDRX_LOG(log_error, "Failed to alloc: %s", strerror(err));
0706         userlog("Failed to alloc: %s", strerror(err));
0707         EXFAIL_OUT(ret);
0708     }
0709 
0710     tmp->buf = *pbuf;
0711     *pbuf = NULL; /* save the buffer... */
0712     tmp->len = pbuf_len;
0713     tmp->data_len = rply_len;
0714     tmp->prev = NULL;
0715     tmp->next = NULL;
0716 
0717     /* Add some lock ... (this just exchanges ptr, thus spin lock) */
0718     DL_APPEND(G_atmi_tls->memq, tmp); 
0719 
0720 out:
0721     return ret;    
0722 }
0723 
0724 /**
0725  * Dequeue message from memq
0726  * - if cd is given, then seek for the CD
0727  * - if cd is not give, and we have any flag, then return first
0728  * @param cd seek for this cd
0729  * @param flags get any?
0730  * @param pbuf buffer to swap
0731  * @param pbuf_len buffer len
0732  * @param rply_len message len
0733  * @return EXSUCCEED - not found, EXTRUE -> found something
0734  */
0735 exprivate int ndrx_rm_frm_memq(int cd, long flags, char **pbuf, size_t *pbuf_len, ssize_t *rply_len)
0736 {
0737     int ret=EXSUCCEED;
0738     tpmemq_t *el, *elt;
0739     NDRX_LOG(log_info, "Got message from memq...");
0740 
0741     /* grab the buffer of mem linked list - check the flags any
0742      * or what
0743      */
0744     if (flags & TPGETANY)
0745     {
0746         /* the buffer is allocated already by sysalloc, thus
0747          * continue to use this buffer and free up our working buf.
0748          */
0749         NDRX_SYSBUF_FREE(*pbuf);
0750         *pbuf = G_atmi_tls->memq->buf;
0751         *pbuf_len = G_atmi_tls->memq->len;
0752         *rply_len = G_atmi_tls->memq->data_len;
0753 
0754         /* delete first elem in the list */
0755         el = G_atmi_tls->memq;
0756         ret=EXTRUE;
0757     }
0758     else
0759     {
0760         /* search for matched cd */
0761         DL_FOREACH_SAFE(G_atmi_tls->memq, el, elt)
0762         {
0763             tp_command_call_t *rply = (tp_command_call_t *)el->buf;
0764             
0765             if (rply->cd==cd)
0766             {    
0767                 NDRX_SYSBUF_FREE(*pbuf);
0768                 *pbuf = el->buf;
0769                 *pbuf_len = el->len;
0770                 *rply_len = el->data_len;
0771                 ret=EXTRUE;
0772                 break;
0773             }
0774         }
0775     }
0776     
0777     /* remove any found rec */
0778     if (EXTRUE==ret)
0779     {
0780         DL_DELETE(G_atmi_tls->memq, el);
0781         NDRX_FPFREE(el);
0782     }
0783 out:
0784     return ret;
0785 }
0786 
0787 /**
0788  * Internal version of tpgetrply.
0789  * @param cd call descriptor
0790  * @param data data buffer into which return value
0791  * @param len data len
0792  * @param flags flags
0793  * @return EXSUCCEED/EXFAIL
0794  */
0795 expublic int ndrx_tpgetrply (int *cd,
0796                        int cd_exp,
0797                        char **data ,
0798                        long *len, long flags,
0799                        TPTRANID *p_tranid)
0800 {
0801     int ret=EXSUCCEED;
0802     char *pbuf = NULL;
0803     ssize_t rply_len;
0804     unsigned prio;
0805     size_t pbuf_len;
0806     tp_command_call_t *rply=NULL;
0807     int answ_ok = EXFALSE;
0808     int is_abort_only = EXFALSE; /* Should we abort global tx (if open) */
0809     ATMI_TLS_ENTRY;
0810     
0811     /* Allocate the buffer, dynamically... */
0812     NDRX_LOG(log_debug, "%s enter, flags %ld cd_exp %d data=%p *data=%p len=%p",
0813              __func__, flags, cd_exp, data, *data, len);
0814         
0815     /* TODO: If we keep linked list with call descriptors and if there is
0816      * none, then we should return something back - FAIL/proto, not? */
0817     
0818     if (!(flags & TPGETANY) && 
0819             CALL_WAITING_FOR_ANS!=G_atmi_tls->G_call_state[cd_exp].status)
0820     {
0821         ndrx_TPset_error_fmt(TPEBADDESC, "Call descriptor %d is %s", 
0822                 cd_exp, 
0823                 CALL_NOT_ISSUED==G_atmi_tls->G_call_state[*cd].status?"not issued":"canceled");
0824         EXFAIL_OUT(ret);
0825     }
0826     
0827     /**
0828      * We will drop any answers not registered for this call
0829      */
0830     while (!answ_ok)
0831     {
0832         
0833         if (NULL==pbuf)
0834         {
0835             NDRX_SYSBUF_MALLOC_WERR_OUT(pbuf, pbuf_len, ret);
0836             rply  = (tp_command_call_t *)pbuf;
0837         }
0838         
0839         /* We shall check that we do not have something in memq...
0840          * if so then switch the buffers and make current free
0841          */
0842         if (NULL!=G_atmi_tls->memq &&
0843                 /* read from queue if has something... */
0844                 EXTRUE==ndrx_rm_frm_memq(*cd, flags, &pbuf, &pbuf_len, &rply_len))
0845         {
0846             rply  = (tp_command_call_t *)pbuf;
0847             NDRX_LOG(log_debug, "from memq: pbuf=%p", pbuf);
0848         }
0849         else
0850         {
0851             NDRX_LOG(log_info, "Waiting on OS Q mqd_t=%d...",
0852             G_atmi_tls->G_atmi_conf.reply_q);
0853             
0854             /* receive the reply back, use original next tout setting (if any) */
0855             rply_len = ndrx_generic_q_receive(G_atmi_tls->G_atmi_conf.reply_q, 
0856                     G_atmi_tls->G_atmi_conf.reply_q_str,
0857                     &(G_atmi_tls->G_atmi_conf.reply_q_attr),
0858                     (char *)rply, pbuf_len, &prio, flags);
0859         }
0860         
0861         /* In case  if we did receive any response (in non blocked mode
0862          * or we did get fail in blocked mode with TPETIME, then we should
0863          * look up the table for which really we did get the time-out.
0864          */
0865         if ((flags & TPNOBLOCK && GEN_QUEUE_ERR_NO_DATA==rply_len) || 
0866                 (EXFAIL==rply_len && TPETIME==tperrno))
0867         {
0868             if (flags & TPGETANY)
0869             {
0870                 if (EXSUCCEED!=(ret = call_scan_tout(EXFAIL, cd)))
0871                 {
0872                     goto out;
0873                 }
0874             }
0875             else
0876             {
0877                 if (EXSUCCEED!=(ret = call_scan_tout(cd_exp, cd)))
0878                 {
0879                     goto out;
0880                 }
0881             }
0882         }
0883 
0884         if (GEN_QUEUE_ERR_NO_DATA==rply_len)
0885         {
0886             /* Bug #168
0887              * there is no data in reply, nothing to do & nothing to return 
0888              * Maybe we need to return TPEBLOCK?
0889              */
0890             /* *cd = 0; */
0891             ndrx_TPset_error_msg(TPEBLOCK, "TPENOBLOCK was specified in flags and "
0892                     "no message is in queue");
0893             ret=EXFAIL;
0894             goto out;
0895         }
0896         else if (EXFAIL==rply_len)
0897         {
0898             /* we have failed */
0899             NDRX_LOG(log_debug, "%s failed to receive answer", __func__);
0900             ret=EXFAIL;
0901             goto out;
0902         }
0903         else
0904         {
0905             if (ATMI_COMMAND_TPNOTIFY==rply->command_id ||
0906                     ATMI_COMMAND_BROADCAST==rply->command_id)
0907             {
0908                 NDRX_LOG(log_debug, "%s message received -> _tpnotify", 
0909                         (ATMI_COMMAND_TPNOTIFY==rply->command_id?"Notification":"Broadcast"));
0910                 /* process the notif... */
0911                 ndrx_process_notif(pbuf, rply_len);
0912                 
0913                 /* And continue... */
0914                 continue;
0915             }
0916 
0917             NDRX_LOG(log_debug, "accept any: %s, cd=%d (name: [%s], my_id: [%s]) "
0918                 "atmi_tls=%p cmd=%hd rplybuf=%p rply_len=%zd",
0919                 (flags & TPGETANY)?"yes":"no", rply->cd,
0920                 rply->my_id, rply->name, G_atmi_tls, rply->command_id, pbuf,
0921                 rply_len);
0922 
0923             /* if answer is not expected, then we receive again! */
0924             if (CALL_WAITING_FOR_ANS==G_atmi_tls->G_call_state[rply->cd].status &&
0925                     G_atmi_tls->G_call_state[rply->cd].timestamp == rply->timestamp &&
0926                     G_atmi_tls->G_call_state[rply->cd].callseq == rply->callseq)
0927             {
0928                 /* Drop if we do not expect it!!! */
0929                 /* 01/11/2012 - if we have TPGETANY we ignore the cd */
0930                 if (/*cd_exp!=FAIL*/ !(flags & TPGETANY) && rply->cd!=cd_exp)
0931                 {
0932                     
0933                     NDRX_LOG(log_warn, "Out of bound msg (for different cd): "
0934                         "cd: %d, expected cd: %d timestamp: %d callseq: %u, "
0935                             "reply from %s, cd status %hd - add to buffer",
0936                         rply->cd, cd_exp, rply->timestamp, rply->callseq, rply->reply_to,
0937                             G_atmi_tls->G_call_state[rply->cd].status);
0938                     
0939                     /* add msg to memqueue... */
0940                     if (EXSUCCEED!=ndrx_add_to_memq(&pbuf, pbuf_len, rply_len))
0941                     {
0942                         EXFAIL_OUT(ret);
0943                     }
0944                     
0945                     continue;
0946                 }
0947 
0948                 NDRX_LOG(log_info, "Reply cd: %d, timestamp :%d callseq: %u from "
0949                         "%s - expected OK!",
0950                         rply->cd, rply->timestamp, rply->callseq, rply->reply_to);
0951                 answ_ok=EXTRUE;
0952                 /* Free up call descriptor!! */
0953                 unlock_call_descriptor(rply->cd, CALL_NOT_ISSUED);
0954             }
0955             else
0956             {
0957                 NDRX_LOG(log_warn, "Dropping incoming message (not expected): "
0958                         "cd: %d, timestamp :%d, callseq: %u reply from %s cd status %hd",
0959                         rply->cd, rply->timestamp, rply->callseq, rply->reply_to,
0960                         G_atmi_tls->G_call_state[rply->cd].status);
0961                 
0962                 continue; /* Wait for next message! */
0963             }
0964 
0965             /* If we are in global tx & we have abort_only, then report */
0966             if (TMTXFLAGS_IS_ABORT_ONLY & rply->tmtxflags)
0967             {
0968                 NDRX_LOG(log_warn, "Reply contains SYS_XA_ABORT_ONLY!");
0969                 is_abort_only = EXTRUE;
0970             }
0971             /* TODO: check incoming type! */
0972             /* we have an answer - prepare buffer */
0973             *cd = rply->cd;
0974             if (rply->sysflags & SYS_FLAG_REPLY_ERROR)
0975             {
0976                 ndrx_TPset_error_msg(rply->rcode, "Server failed to generate reply");
0977                 ret=EXFAIL;
0978                 goto out;
0979             }
0980             else
0981             {
0982                 /* Convert all, including NULL buffers  */
0983                 ret = ndrx_mbuf_prepare_incoming (rply->data, rply->data_len, 
0984                         data, len, flags, 0);
0985                 
0986                 /* put rcode in global */
0987                 G_atmi_tls->M_svc_return_code = rply->rcode;
0988 
0989                 /* TODO: Check buffer acceptance or do it inside of prepare_incoming? */
0990                 if (ret==EXFAIL)
0991                 {
0992                     goto out;
0993                 }
0994                 /* if all OK, then finally check the reply answer
0995                  * Maybe want to use code returned in rval?
0996                  */
0997                 if (TPSUCCESS!=rply->rval)
0998                 {
0999                     ndrx_TPset_error_fmt(TPESVCFAIL, "Service returned %d", 
1000                             rply->rval);
1001                     ret=EXFAIL;
1002                     goto out;
1003                 }
1004             }
1005         } /* reply recieved ok */
1006     }
1007 out:
1008 
1009     /* Restore transaction if was suspended. */
1010     if (flags & TPTRANSUSPEND && p_tranid && p_tranid->tmxid[0])
1011     {
1012         /* Save error... (if any...) 
1013          * Bug #417
1014          */
1015         atmi_error_t err;
1016         int err_saved = EXFALSE;
1017         
1018         if (0!=tperrno)
1019         {
1020             ndrx_TPsave_error(&err);
1021             err_saved = EXTRUE;
1022         }
1023         
1024         /* resume the transaction */
1025         if (EXSUCCEED!=ndrx_tpresume(p_tranid, TPTXTMSUSPEND) && EXSUCCEED==ret)
1026         {
1027             ret=EXFAIL;
1028         }
1029         
1030         /* Restore error if was saved.. */
1031         
1032         if (err_saved)
1033         {
1034             ndrx_TPrestore_error(&err);
1035         }
1036         else if (EXFAIL==ret)
1037         {
1038             /* override the error to TPESYSTEM, as cannot start the transaction */
1039              ndrx_TPoverride_code(TPESYSTEM);
1040         }
1041         
1042     }
1043 
1044     if (G_atmi_tls->G_atmi_xa_curtx.txinfo && NULL!=rply
1045             && 0==strcmp(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid, rply->tmxid)
1046             && EXSUCCEED!=atmi_xa_update_known_rms(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmknownrms, 
1047                 rply->tmknownrms))
1048     {
1049         /* set system error */
1050         NDRX_LOG(log_error, "Failed to atmi_xa_update_known_rms()");
1051         ndrx_TPoverride_code(TPESYSTEM);
1052         ret = EXFAIL;
1053     }
1054 
1055     /* Abort only marking section */
1056     NDRX_ABORT_START(is_abort_only)
1057     NDRX_ABORT_END(is_abort_only)
1058             
1059     /* free up the system buffer */
1060     if (NULL!=pbuf)
1061     {
1062         NDRX_SYSBUF_FREE(pbuf);
1063     }
1064                 
1065     NDRX_LOG(log_debug, "%s return %d tpurcode=%ld tperror=%d data=%p *data=%p len=%p *len=%ld",
1066             __func__, ret, G_atmi_tls->M_svc_return_code, G_atmi_tls->M_atmi_error,
1067              data, *data, len, *len);
1068     /* mvitolin 12/12/2015 - according to spec we must return 
1069      * service returned return code
1070      * mvitolin, 18/02/2018 Really? Cannot find any references...
1071      */
1072     /*
1073     if (EXSUCCEED==ret)
1074     {
1075         return G_atmi_tls->M_svc_return_code;
1076     }
1077     else 
1078     {
1079         return ret;
1080     }
1081     */
1082     
1083     return ret;
1084 }
1085 
1086 /**
1087  * Internal version to tpcall.
1088  * Support for flags:
1089  * TPNOTRAN    TPNOBLOCK
1090  * TPNOTIME    TPSIGRSTRT
1091  *
1092  * @param svc
1093  * @param idata
1094  * @param ilen
1095  * @param odata
1096  * @param olen
1097  * @param flags
1098  * @param user1 user data field 1
1099  * @param user2 user data field 2
1100  * @return
1101  */
1102 expublic int ndrx_tpcall (char *svc, char *idata, long ilen,
1103                 char **odata, long *olen, long flags,
1104                 char *extradata, int dest_node, int ex_flags,
1105                 int user1, long user2, int user3, long user4)
1106 {
1107     int ret=EXSUCCEED;
1108     int cd_req = 0;
1109     int cd_rply = 0;
1110     ndrx_tpcall_cache_ctl_t cachectl;
1111     int cache_used = EXFALSE;
1112     TPTRANID tranid, *p_tranid;
1113     
1114     NDRX_LOG(log_debug, "%s: enter flags=%ld tx=%p xa_flags_sys=%ld "
1115             "idata=%p ilen=%ld odata=%p *odata=%p olen=%p",
1116             __func__, flags, G_atmi_tls->G_atmi_xa_curtx.txinfo,
1117              G_atmi_env.xa_flags_sys, idata, ilen, odata, *odata, olen);
1118     
1119     cachectl.should_cache = EXFALSE;
1120     cachectl.cached_rsp = EXFALSE;
1121 
1122     /* In case if not no tran and have global tran */
1123     if (    !(flags & TPNOTRAN) &&  G_atmi_tls->G_atmi_xa_curtx.txinfo &&
1124             (
1125                 /* if forced suspend */
1126                 (flags & TPTRANSUSPEND) ||
1127                     /* Or not marked as no join and not marked as no-suspend*/
1128                     (
1129                         !(G_atmi_env.xa_flags_sys & NDRX_XA_FLAG_SYS_NOJOIN)
1130                         &&  
1131                         !(G_atmi_env.xa_flags_sys & NDRX_XA_FLAG_SYS_NOSUSPEND)
1132                     )
1133             )
1134        )
1135     {
1136         /* mark finally that suspend shall happen */
1137         flags|=TPTRANSUSPEND;
1138         memset(&tranid, 0, sizeof(tranid));
1139         p_tranid = &tranid;
1140     }
1141     else
1142     {
1143         p_tranid = NULL;
1144     }
1145     
1146     if (ndrx_cache_used())
1147     {
1148         cache_used = EXTRUE;
1149         memset(&cachectl, 0, sizeof(cachectl));
1150         
1151         cachectl.odata = odata;
1152         cachectl.olen = olen;
1153     }
1154     
1155     /* use same setting for reply processing */
1156     if (EXFAIL==(cd_req=ndrx_tpacall (svc, idata, ilen, flags, extradata, 
1157             dest_node, ex_flags, p_tranid, user1, user2, user3, user4,
1158             (cache_used?&cachectl:NULL) )))
1159     {
1160         NDRX_LOG(log_error, "_tpacall to %s failed", svc);
1161         ret=EXFAIL;
1162         goto out;
1163     } 
1164     
1165     if (cachectl.cached_rsp)
1166     {
1167         NDRX_LOG(log_debug, "Reply from cache");
1168         
1169         NDRX_LOG(log_info, "Response read form cache!");
1170         G_atmi_tls->M_svc_return_code = cachectl.saved_tpurcode;
1171 
1172         if (0!=cachectl.saved_tperrno)
1173         {
1174             ndrx_TPset_error_msg(cachectl.saved_tperrno, "Cached error response");
1175             ret=EXFAIL;
1176         }
1177         
1178         /*  We are already in cache! */
1179         goto out;
1180     }
1181 
1182     /* Support #259 Do this only after tpacall, because we might do
1183      * non blocked requests, but responses we way in blocked mode.
1184      */
1185     flags&=~TPNOBLOCK; /* we are working in sync (blocked) mode
1186                         * because we do want answer back! */
1187     
1188     /* event posting might be done without reply... */
1189     if (!(flags & TPNOREPLY))
1190     {
1191         if (EXSUCCEED!=(ret=ndrx_tpgetrply(&cd_rply, cd_req, odata, olen, flags, 
1192                 p_tranid)))
1193         {
1194             NDRX_LOG(log_error, "_tpgetrply to %s failed", svc);
1195             goto out;
1196         }
1197 
1198         /*
1199          * Did we get back what we asked for?
1200          */
1201         if (cd_req!=cd_rply)
1202         {
1203             ret=EXFAIL;
1204             ndrx_TPset_error_fmt(TPEPROTO, "%s: Got invalid reply! cd_req: %d, cd_rply: %d",
1205                                              __func__, cd_req, cd_rply);
1206             goto out;
1207         }
1208     }
1209 
1210 out:
1211 
1212     /* Bug #560 */
1213     if (EXSUCCEED!=ret && TPETIME==tperrno)
1214     {
1215         ndrx_tpcancel(cd_req);
1216     }
1217 
1218     /* tpcall cache implementation: add to cache if required */
1219     if (!(flags & TPNOCACHEADD) && cachectl.should_cache && !cachectl.cached_rsp)
1220     {
1221         int ret2;
1222         
1223         /* lookup cache, what about tperrno?*/
1224         if (EXSUCCEED!=(ret2=ndrx_cache_save (svc, *odata, 
1225             *olen, tperrno, G_atmi_tls->M_svc_return_code, 
1226                 G_atmi_env.our_nodeid, flags, EXFAIL, EXFAIL, EXFALSE)))
1227         {
1228             /* return error if failed to cache? */
1229             if (NDRX_TPCACHE_ENOCACHE!=ret2)
1230             {
1231                 userlog("Failed to save service [%s] cache results: %s", svc,
1232                     tpstrerror(tperrno));
1233             }
1234         }
1235     }
1236 
1237     NDRX_LOG(log_debug, "%s: return %d cd %d odata=%p *odata=%p olen=%p *olen=%ld",
1238                 __func__, ret, cd_rply, odata, *odata, olen, *olen);
1239     return ret;
1240 }
1241 
1242 /**
1243  * Internal version to tpcancel
1244  * @param
1245  * @return
1246  */
1247 expublic int ndrx_tpcancel (int cd)
1248 {
1249     int ret=EXSUCCEED;
1250     tpmemq_t *el, *elt;
1251     char *data = NULL;
1252     long len;
1253     ATMI_TLS_ENTRY;
1254     
1255     NDRX_LOG(log_debug, "tpcancel issued for %d", cd);
1256 
1257     if (cd<1||cd>=MAX_ASYNC_CALLS)
1258     {
1259         ndrx_TPset_error_fmt(TPEBADDESC, "%s: Invalid call descriptor %d, should be 0<cd<%d",
1260                                          __func__, cd, MAX_ASYNC_CALLS);
1261         ret=EXFAIL;
1262         goto out;
1263     }
1264 
1265     
1266     /* just receive something from the Q
1267      * so that if we do the cancel always and never tpgetrply
1268      * that might block the responsers to us
1269      * ignore any error...
1270      */
1271     ndrx_tpgetrply(&cd, cd, &data, &len, TPNOBLOCK|TPNOABORT, NULL);
1272     if (NULL!=data)
1273     {
1274         tpfree(data);
1275     }
1276     
1277     /* search for matched cd and clean any queued messages... */
1278     DL_FOREACH_SAFE(G_atmi_tls->memq, el, elt)
1279     {
1280         tp_command_call_t *rply = (tp_command_call_t *)el->buf;
1281         if (rply->cd==cd)
1282         {    
1283             NDRX_SYSBUF_FREE(el->buf);
1284             NDRX_FPFREE(el);
1285         }
1286     }
1287     
1288     /* Mark call as cancelled, so that we could re-use it later. */
1289     G_atmi_tls->G_call_state[cd].status = CALL_CANCELED;
1290 
1291 out:
1292     return ret;
1293 }
1294 
1295 
1296 /**
1297  * ATMI standard
1298  * @return - pointer to int holding error code?
1299  */
1300 expublic long * _exget_tpurcode_addr (void)
1301 {
1302     ATMI_TLS_ENTRY;
1303     return &G_atmi_tls->M_svc_return_code;
1304 }
1305 
1306 
1307 /* vim: set ts=4 sw=4 et smartindent: */