Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Implements main functions for conversational server
0003  *   Some thoughts:
0004  *   1. Control is on that side which sends the message.
0005  *   2. ?
0006  *   For server incoming call descriptor, there shall be only one descriptor
0007  *   reserved.
0008  *
0009  * @file conversation.c
0010  */
0011 /* -----------------------------------------------------------------------------
0012  * Enduro/X Middleware Platform for Distributed Transaction Processing
0013  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0014  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0015  * This software is released under one of the following licenses:
0016  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0017  * See LICENSE file for full text.
0018  * -----------------------------------------------------------------------------
0019  * AGPL license:
0020  *
0021  * This program is free software; you can redistribute it and/or modify it under
0022  * the terms of the GNU Affero General Public License, version 3 as published
0023  * by the Free Software Foundation;
0024  *
0025  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0026  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0027  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0028  * for more details.
0029  *
0030  * You should have received a copy of the GNU Affero General Public License along 
0031  * with this program; if not, write to the Free Software Foundation, Inc.,
0032  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0033  *
0034  * -----------------------------------------------------------------------------
0035  * A commercial use license is available from Mavimax, Ltd
0036  * contact@mavimax.com
0037  * -----------------------------------------------------------------------------
0038  */
0039 #include <string.h>
0040 #include <stdio.h>
0041 #include <stdlib.h>
0042 #include <memory.h>
0043 #include <errno.h>
0044 #include <sys/stat.h>
0045 #include <fcntl.h>
0046 
0047 #include <ndrstandard.h>
0048 #include <ndebug.h>
0049 #include <atmi_int.h>
0050 #include <typed_buf.h>
0051 
0052 #include "../libatmisrv/srv_int.h"
0053 #include "userlog.h"
0054 #include <thlock.h>
0055 #include <xa_cmn.h>
0056 #include <tperror.h>
0057 #include <atmi_shm.h>
0058 #include <atmi_tls.h>
0059 #include <ndrx_ddr.h>
0060 /*---------------------------Externs------------------------------------*/
0061 /*---------------------------Macros-------------------------------------*/
0062 #define CONV_TARGET_FLAGS(X)     \
0063     do\
0064     {\
0065         /* Fix up send/receive flags */\
0066         if (X->flags & TPSENDONLY)\
0067         {\
0068             X->flags&=~TPSENDONLY;\
0069             X->flags|=TPRECVONLY;\
0070         }\
0071         else if (X->flags & TPRECVONLY)\
0072         {\
0073             X->flags&=~TPRECVONLY;\
0074             X->flags|=TPSENDONLY;\
0075         }\
0076     } \
0077     while (0)
0078 
0079 
0080 /* #define CONV_USE_ACK 1 */
0081 /*---------------------------Enums--------------------------------------*/
0082 /*---------------------------Typedefs-----------------------------------*/
0083 /*---------------------------Globals------------------------------------*/
0084 int M_had_open_con = EXFALSE;
0085 /*---------------------------Statics------------------------------------*/
0086 /*---------------------------Prototypes---------------------------------*/
0087 exprivate mqd_t open_conv_q(char *q,  struct mq_attr *q_attr);
0088 exprivate mqd_t open_reply_q(char *q, struct mq_attr *q_attr);
0089 exprivate void rcv_hash_delall(tp_conversation_control_t *conv);
0090 exprivate char * rcv_hash_ck(tp_conversation_control_t *conv, unsigned msgseq);
0091 
0092 /**
0093  * Closes any connection made as client.
0094  * This will send disconnect event to them, because normally server should do
0095  * tpreturn!
0096  * Try to close all queues, if have some error, report it
0097  */
0098 expublic int close_open_client_connections(void)
0099 {
0100     int i;
0101     int ret=EXSUCCEED;
0102 
0103     ATMI_TLS_ENTRY;
0104     
0105     /* nothing to do, we do not have opened any client connections! */
0106     if (!M_had_open_con)
0107     {
0108         return EXSUCCEED;
0109     }
0110 
0111     for (i=0; i<MAX_CONNECTIONS; i++)
0112     {
0113         if (CONV_IN_CONVERSATION==G_atmi_tls->G_tp_conversation_status[i].status)
0114         {
0115             if (EXFAIL==ndrx_tpdiscon(G_atmi_tls->G_tp_conversation_status[i].cd))
0116             {
0117                 NDRX_LOG(log_warn, "Failed to close connection [%d]", i);
0118                 ret=EXFAIL;
0119             }
0120         }
0121     }
0122     M_had_open_con = EXFALSE;
0123     
0124     return ret;
0125 }
0126 
0127 
0128 /**
0129  * Return TRUE if we have any open connection
0130  * @return TRUE/FALSE
0131  */
0132 expublic int have_open_connection(void)
0133 {
0134     int i;
0135     int ret=EXFALSE;
0136     ATMI_TLS_ENTRY;
0137     
0138     /* nothing to do, we do not have opened any client connections! */
0139     if (!M_had_open_con)
0140     {
0141         return EXFALSE;
0142     }
0143 
0144     for (i=0; i<MAX_CONNECTIONS; i++)
0145     {
0146         if (CONV_IN_CONVERSATION==G_atmi_tls->G_tp_conversation_status[i].status)
0147         {
0148             ret=EXTRUE;
0149             break;
0150         }
0151     }
0152 
0153     NDRX_LOG(log_debug, "We %s open connections!",
0154                             ret?"have":"do not have");
0155 
0156     return ret;
0157     
0158 }
0159 
0160 /**
0161  * Reject connection opening, due to error
0162  * This will deliver note to client that connection is rejected
0163  * @param err tperrno code
0164  * @return EXSUCCEED/EXFAIL
0165  */
0166 expublic int ndrx_reject_connection(int err)
0167 {
0168     char their_qstr[NDRX_MAX_Q_SIZE+1];
0169     tp_command_call_t *call;
0170     char *buf=NULL;
0171     size_t buf_len;
0172     int ret = EXSUCCEED;
0173     
0174     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0175     call = (tp_command_call_t *)buf;
0176     
0177     memset(call, 0, sizeof(*call));
0178     
0179     if (0!=G_atmi_tls->G_last_call.callstack[0])
0180     {
0181         br_dump_nodestack(G_atmi_tls->G_last_call.callstack, 
0182                 "Incoming conversation from bridge,"
0183                 "using first node from node stack");
0184 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0185         /* poll() mode: */
0186         {
0187             int is_bridge;
0188             char tmpsvc[MAXTIDENT+1];
0189 
0190             snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, 
0191                     (int)G_atmi_tls->G_last_call.callstack[0]);
0192 
0193             if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, their_qstr, &is_bridge,
0194                     NULL))
0195             {
0196                 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]", 
0197                         tmpsvc);
0198                 EXFAIL_OUT(ret);
0199             }
0200         }
0201 #else
0202         snprintf(their_qstr, sizeof(their_qstr),NDRX_SVC_QBRDIGE, 
0203                 G_atmi_tls->G_atmi_conf.q_prefix, 
0204                 (int)G_atmi_tls->G_last_call.callstack[0]);
0205 #endif
0206         
0207     }
0208     else
0209     {
0210         /* Local conversation  */
0211         NDRX_STRCPY_SAFE(their_qstr, G_atmi_tls->G_last_call.reply_to);
0212     }
0213 
0214     /* OK, now fill up the details */
0215     call->data_len = 0;
0216     call->callseq = G_atmi_tls->G_last_call.callseq;
0217     call->msgseq = NDRX_CONF_MSGSEQ_START;
0218     call->command_id = ATMI_COMMAND_CONNRPLY;
0219     call->flags = 0;
0220     call->sysflags|=SYS_FLAG_REPLY_ERROR;
0221     call->rcode = err;
0222     call->clttout = G_atmi_env.time_out;
0223     NDRX_STRCPY_SAFE(call->reply_to, G_atmi_tls->G_last_call.reply_to);
0224     ndrx_stopwatch_reset(&call->timer);
0225 
0226     if (EXSUCCEED!=(ret=ndrx_generic_q_send(their_qstr, (char *)call, sizeof(*call), 
0227             TPNOBLOCK, 0)))
0228     {
0229         NDRX_LOG(log_error, "Failed to deliver reject conn status %d to: [%s]",
0230                 err, their_qstr);
0231         userlog("Failed to deliver reject conn status %d to: [%s]",
0232                 err, their_qstr);
0233         EXFAIL_OUT(ret);
0234     }
0235 
0236 out:
0237                                 
0238     if (NULL!=buf)
0239     {
0240         NDRX_SYSBUF_FREE(buf);
0241     }
0242 
0243     return ret;    
0244     
0245 }
0246 
0247 /**
0248  * This assumes that we have already set last call data.
0249  * TODO: Error if already in conversation?
0250  */
0251 expublic int accept_connection(void)
0252 {
0253     int ret=EXSUCCEED;
0254     tp_conversation_control_t *conv;
0255     long revent;
0256     char their_qstr[NDRX_MAX_Q_SIZE+1];
0257     ATMI_TLS_ENTRY;
0258     
0259     conv= &G_atmi_tls->G_accepted_connection;
0260 /*
0261     char send_q[NDRX_MAX_Q_SIZE+1]; */
0262 
0263     conv->flags = G_atmi_tls->G_last_call.flags; /* Save call flags */
0264     
0265     /* Fix up cd for future use for replies.  
0266      * Lets keep original cd...
0267      */
0268     conv->cd = G_atmi_tls->G_last_call.cd-NDRX_CONV_UPPER_CNT;
0269     /* Change the status, that we have connection open */
0270     conv->status = CONV_IN_CONVERSATION;
0271     conv->msgseqout = NDRX_CONF_MSGSEQ_START;
0272     conv->msgseqin = NDRX_CONF_MSGSEQ_START;
0273     conv->callseq = G_atmi_tls->G_last_call.callseq;
0274     /* 1. Open listening queue */
0275     snprintf(conv->my_listen_q_str, sizeof(conv->my_listen_q_str), 
0276                     NDRX_CONV_SRV_Q,
0277                     G_atmi_tls->G_atmi_conf.q_prefix, 
0278                     G_atmi_tls->G_last_call.my_id, 
0279                     conv->cd,
0280                     /* In accepted connection we put their id */
0281                     G_atmi_tls->G_atmi_conf.my_id
0282                     );
0283     conv->reply_q = (mqd_t)EXFAIL;
0284     
0285     /* TODO: Firstly we should open the queue on which to listen right? */
0286     if ((mqd_t)EXFAIL==(conv->my_listen_q =
0287                     open_conv_q(conv->my_listen_q_str, &conv->my_q_attr)))
0288     {
0289         NDRX_LOG(log_error, "%s: Failed to open listen queue", __func__);
0290         ret=EXFAIL;
0291         goto out;
0292     }
0293 
0294     /* 2. Connect to their reply queue */
0295     NDRX_STRCPY_SAFE(conv->reply_q_str, G_atmi_tls->G_last_call.reply_to);
0296     
0297     /* Check is this coming from bridge. If so the we connect to bridge */
0298     
0299     if (0!=G_atmi_tls->G_last_call.callstack[0])
0300     {
0301         br_dump_nodestack(G_atmi_tls->G_last_call.callstack, 
0302                 "Incoming conversation from bridge,"
0303                 "using first node from node stack");
0304 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0305         /* poll() mode: */
0306         {
0307             int is_bridge;
0308             char tmpsvc[MAXTIDENT+1];
0309 
0310             snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, 
0311                     (int)G_atmi_tls->G_last_call.callstack[0]);
0312 
0313             if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, their_qstr, &is_bridge,
0314                     NULL))
0315             {
0316                 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]", 
0317                         tmpsvc);
0318                 EXFAIL_OUT(ret);
0319             }
0320         }
0321 #else
0322         snprintf(their_qstr, sizeof(their_qstr),NDRX_SVC_QBRDIGE, 
0323                 G_atmi_tls->G_atmi_conf.q_prefix, 
0324                 (int)G_atmi_tls->G_last_call.callstack[0]);
0325 #endif
0326         
0327     }
0328     else
0329     {
0330         /* Local conversation  */
0331         NDRX_STRCPY_SAFE(their_qstr, conv->reply_q_str);
0332     }
0333     
0334     NDRX_LOG(log_debug, "Connecting to: [%s]", their_qstr);
0335     
0336     if ((mqd_t)EXFAIL==(conv->reply_q=open_reply_q(their_qstr, &conv->reply_q_attr)))
0337     {
0338         NDRX_LOG(log_error, "Cannot connect to reply queue [%s] - "
0339                                         "cannot accept connection!", 
0340                                         their_qstr);
0341         ret=EXFAIL;
0342         goto out;
0343     }
0344 
0345     /* 3. Send back reply to their queue */
0346     NDRX_LOG(log_debug, "About to send handshake back to client...");
0347     if (EXSUCCEED!=ndrx_tpsend(G_atmi_tls->G_last_call.cd, NULL, 0, 0, &revent,
0348                             ATMI_COMMAND_CONNRPLY))
0349     {
0350         NDRX_LOG(log_error, "Failed to reply for acceptance: %s", tpstrerror(tperrno));
0351         ret=EXFAIL;
0352 
0353     }
0354     
0355 out:
0356 
0357     /* Close down the queue if we fail but queue was opened! */
0358     if (EXSUCCEED!=ret)
0359     {
0360         if ((mqd_t)EXFAIL!=conv->my_listen_q)
0361         {
0362             if (EXFAIL==ndrx_mq_close(conv->my_listen_q))
0363             {
0364                 NDRX_LOG(log_warn, "Failed to close %s:%s",
0365                         conv->my_listen_q_str, strerror(errno));
0366             }
0367             conv->my_listen_q=(mqd_t)EXFAIL;
0368         }
0369     }
0370 
0371     if (EXSUCCEED==ret)
0372     {
0373         conv->handshaked=EXTRUE;
0374     }
0375 
0376     NDRX_LOG(log_debug, "returns %d",  ret);
0377     return ret;
0378 }
0379 
0380 /**
0381  * Return current connection from given cd
0382  * @param cd
0383  * @return NULL/connection control
0384  */
0385 expublic tp_conversation_control_t*  get_current_connection(int cd)
0386 {
0387     tp_conversation_control_t *ret=NULL;
0388     int server=EXFALSE;
0389     ATMI_TLS_ENTRY;
0390     
0391     if (cd>=0 && cd<NDRX_CONV_UPPER_CNT)
0392     {
0393         ret=&G_atmi_tls->G_tp_conversation_status[cd%MAX_CONNECTIONS];
0394     }
0395     else if (cd>=NDRX_CONV_UPPER_CNT)
0396     {
0397         ret=&G_atmi_tls->G_accepted_connection;
0398         server=EXTRUE;
0399     }
0400     else
0401     {
0402         ndrx_TPset_error_fmt(TPEINVAL, "Invalid connection descriptor %d", cd);
0403     }
0404     
0405     
0406     if (NULL!=ret)
0407     {
0408         if (CONV_IN_CONVERSATION!=ret->status)
0409         {
0410             ndrx_TPset_error_fmt(TPEINVAL, "Invalid connection descriptor %d - "
0411                                             "connection closed",  cd);
0412             ret=NULL;
0413         }
0414         else if (ret->cd!=cd && !server)
0415         {
0416             ndrx_TPset_error_fmt(TPEINVAL, "Invalid cd for slot. Slot %d used "
0417                     "by cd=%d but requested cd=%d", cd%MAX_CONNECTIONS, ret->cd, cd);
0418             ret=NULL;
0419         }
0420         else if (ret->cd!=(cd-NDRX_CONV_UPPER_CNT) && server)
0421         {
0422             ndrx_TPset_error_fmt(TPEINVAL, "Invalid cd for server connection."
0423                     "Used cd=%d but requested cd=%d (real: %d)", ret->cd, 
0424                     cd-NDRX_CONV_UPPER_CNT, cd);
0425             ret=NULL;
0426         }
0427     }
0428 
0429     return ret;
0430 }
0431 
0432 /**
0433  * Close connection (Normally!)
0434  * @param [in] dbgmsg debug message
0435  * @return SUCCEED/FAIL
0436  */
0437 expublic int normal_connection_shutdown(tp_conversation_control_t *conv, int killq,
0438         char *dbgmsg)
0439 {
0440     int ret=EXSUCCEED;
0441     ATMI_TLS_ENTRY;
0442     
0443     NDRX_LOG(log_debug, "%s: %s: Closing [%s] killq=%d cd=%d my_listen_q=%p reply_q=%p",
0444          __func__, dbgmsg, conv->my_listen_q_str, killq, conv->cd,
0445         (void *)(long)conv->my_listen_q, (void*)(long)conv->reply_q);
0446 
0447     /* close down the queue */
0448     if ((mqd_t)EXFAIL!=conv->my_listen_q && EXSUCCEED!=ndrx_mq_close(conv->my_listen_q))
0449     {
0450         NDRX_LOG(log_warn, "Failed to ndrx_mq_close [%s]: %s",
0451                                          conv->my_listen_q_str, strerror(errno));
0452         ndrx_TPset_error_fmt(TPEOS, "%s: Failed to ndrx_mq_close [%s]: %s",
0453                             __func__, conv->my_listen_q_str, strerror(errno));
0454        /* ret=FAIL;
0455         goto out; */
0456     }
0457     
0458     /* Remove the queue */
0459     if (killq && EXSUCCEED!=ndrx_mq_unlink(conv->my_listen_q_str))
0460     {
0461         NDRX_LOG(log_warn, "Failed to ndrx_mq_unlink [%s]: %s",
0462                                          conv->my_listen_q_str, strerror(errno));
0463         ndrx_TPset_error_fmt(TPEOS, "%s: Failed to ndrx_mq_unlink [%s]: %s",
0464                             __func__, conv->my_listen_q_str, strerror(errno));
0465         /* ret=FAIL;
0466         goto out; */
0467     }
0468     
0469     /* Kill the reply queue too? */
0470     
0471     NDRX_LOG(log_debug, "Closing [%s]",  conv->reply_q_str);
0472 
0473     /* close down the queue */
0474     if ((mqd_t)EXFAIL!=conv->reply_q && EXSUCCEED!=ndrx_mq_close(conv->reply_q))
0475     {
0476         NDRX_LOG(log_warn, "Failed to ndrx_mq_close [%s]: %s",
0477                                         conv->reply_q_str, strerror(errno));
0478         ndrx_TPset_error_fmt(TPEOS, "%s: Failed to ndrx_mq_close [%s]: %s",
0479                                          __func__, conv->reply_q_str, strerror(errno));
0480        /* ret=FAIL;
0481         goto out; */
0482     }
0483     
0484     /* Remove the queue */
0485     NDRX_LOG(log_warn, "UNLINKING: %s %d", conv->reply_q_str, killq);
0486     if (killq && EXSUCCEED!=ndrx_mq_unlink(conv->reply_q_str))
0487     {
0488         NDRX_LOG(log_warn, "Failed to ndrx_mq_unlink [%s]: %s",
0489                                          conv->reply_q_str, strerror(errno));
0490         ndrx_TPset_error_fmt(TPEOS, "%s: Failed to ndrx_mq_unlink [%s]: %s",
0491                                          __func__, conv->reply_q_str, strerror(errno));
0492         /* ret=FAIL;
0493         goto out; */
0494     }
0495     
0496     /* At this point we reset the CD - free the CD!!  */
0497     /* Unregister CD from global tx */
0498     if (G_atmi_tls->G_atmi_xa_curtx.txinfo)
0499     {
0500         /* try to unregister... anyway (even was called with TPNOTRAN)
0501          * will not find in hash, that's it... 
0502          */
0503         atmi_xa_cd_unreg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->conv_cds), conv->cd);
0504     }
0505     
0506     rcv_hash_delall(conv); /* Remove all buffers if left... */
0507     
0508     memset(conv, 0, sizeof(*conv));
0509     conv->my_listen_q = (mqd_t)EXFAIL;
0510     conv->reply_q = (mqd_t)EXFAIL;
0511     
0512 out:
0513     return ret;
0514 }
0515 
0516 /**
0517  * Get free connection descriptor
0518  * This function should be synced, so that
0519  * each thread have it's own cd.
0520  * @return
0521  */
0522 exprivate int conv_get_cd(long flags)
0523 {
0524     int slot;
0525     int nr_checked=0;
0526     int cd=EXFAIL;
0527     ATMI_TLS_ENTRY;
0528     
0529     
0530     /* Just take a next number... for better hash. */
0531     
0532     while (nr_checked < MAX_CONNECTIONS && cd==EXFAIL)
0533     {
0534         slot = G_atmi_tls->conv_cd % MAX_CONNECTIONS;
0535         
0536         if (CONV_NO_INITATED== G_atmi_tls->G_tp_conversation_status[slot].status)
0537         {
0538             cd=G_atmi_tls->conv_cd;
0539         }
0540         
0541         nr_checked++;
0542         G_atmi_tls->conv_cd++;
0543         
0544         /* reset conv counter 
0545          * The counter goes much higher than connection limit
0546          * so that we do not re-use the cd very soon.
0547          * That might cause at connection termination / re-start again
0548          * some race condition, that server unlinks queues which have been
0549          * re-opened by new connection. If we recycle cds less often, then
0550          * chance is much smaller for stepping on this race condition.
0551          */
0552         if (G_atmi_tls->conv_cd>=NDRX_CONV_UPPER_CNT)
0553         {
0554             G_atmi_tls->conv_cd=0;
0555         }
0556     }
0557 
0558     if (EXFAIL==cd)
0559     {
0560         NDRX_LOG(log_debug, "All connection descriptors have been taken - FAIL!");
0561     }
0562     else
0563     {
0564         NDRX_LOG(log_debug, "Got free connection descriptor %d", cd);
0565     }
0566     
0567     if (EXFAIL!=cd && 
0568             !(flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0569     {
0570         NDRX_LOG(log_debug, "Registering conv cd=%d under global "
0571                 "transaction!", G_atmi_tls->conv_cd);
0572         if (EXSUCCEED!=atmi_xa_cd_reg(&(G_atmi_tls->G_atmi_xa_curtx.txinfo->conv_cds), 
0573                 cd))
0574         {
0575             cd=EXFAIL;
0576         }
0577     }
0578 
0579     /* return the slot we found */
0580     return cd;
0581 }
0582 
0583 /**
0584  * Open the queue on which we are going to listen for the stuff...
0585  * @param q
0586  * @return SUCCEED/FAIL
0587  */
0588 exprivate mqd_t open_conv_q(char *q,  struct mq_attr *q_attr)
0589 {
0590     mqd_t ret=(mqd_t)EXFAIL;
0591 
0592     ret = ndrx_mq_open_at (q, O_RDWR | O_CREAT, S_IWUSR | S_IRUSR, NULL);
0593 
0594     if ((mqd_t)EXFAIL==ret)
0595     {
0596         ndrx_TPset_error_fmt(TPEOS, "%s:Failed to open queue [%s]: %s",  
0597                 __func__, q, strerror(errno));
0598         goto out;
0599     }
0600 
0601         /* read queue attributes */
0602     if (EXFAIL==ndrx_mq_getattr(ret, q_attr))
0603     {
0604         ndrx_TPset_error_fmt(TPEOS, "%s: Failed to read attributes "
0605                 "for queue [%s] fd %d: %s",
0606                 __func__, q, ret, strerror(errno));
0607         /* close queue */
0608         ndrx_mq_close(ret);
0609         /* unlink the queue */
0610         ndrx_mq_unlink(q);
0611 
0612         ret=(mqd_t)EXFAIL;
0613         goto out;
0614     }
0615 
0616 out:
0617     
0618     return ret;
0619 }
0620 
0621 /**
0622  * Open conversation's reply queue
0623  * @param q
0624  * @param q_attr
0625  * @return
0626  */
0627 exprivate mqd_t open_reply_q(char *q, struct mq_attr *q_attr)
0628 {
0629     mqd_t ret=(mqd_t)EXFAIL;
0630 
0631     ret = ndrx_mq_open_at (q, O_RDWR, S_IWUSR | S_IRUSR, NULL);
0632 
0633     if ((mqd_t)EXFAIL==ret)
0634     {
0635         ndrx_TPset_error_fmt(TPEOS, "Failed to open queue [%s]: %s",  
0636                 q, strerror(errno));
0637         goto out;
0638     }
0639 
0640     /* read queue attributes */
0641     if (EXFAIL==ndrx_mq_getattr(ret, q_attr))
0642     {
0643         ndrx_TPset_error_fmt(TPEOS, "Failed to read attributes for queue [%s] fd %d: %s",
0644                                  q, ret, strerror(errno));
0645 #if 0
0646         /* We cannot remove their Q!!! */
0647         /* close queue */
0648         ndrx_mq_close(ret);
0649         /* unlink the queue */
0650         ndrx_mq_unlink(q);
0651 #endif
0652         ret=(mqd_t)EXFAIL;
0653         goto out;
0654     }
0655     
0656 out:
0657 
0658     return ret;
0659 }
0660 
0661 /**
0662  * Internal implementation of tpconnect.
0663  * So basically after connect, the picture if following (example):
0664  * 
0665  * As client we option this queue: /dom1,cnv,c,clt,atmiclt35,24280,5,1,1, and 
0666  * this is used for us to receive msgs. Thus if atmiclt35 is dead, we can remove
0667  * this q (of corse if we are on node 1).
0668  * 
0669  * When server accepts connection, it builds something like this:
0670  * dom2,cnv,s,clt,atmiclt35,24280,1,1,1,srv,atmisv35,10,24228,0,2 and this is
0671  * used on their side to receive msgs.
0672  * Thus if atmisv35 is dead and we are on node2, we can remove the q.
0673  * 
0674  * @param svc
0675  * @param data
0676  * @param len
0677  * @param flags
0678  * @return
0679  */
0680 expublic int ndrx_tpconnect (char *svc, char *data, long len, long flags)
0681 {
0682     int ret=EXSUCCEED;
0683     int cd=EXFAIL;
0684     char *buf=NULL;
0685     char *queuebuf=NULL;
0686     size_t buf_len;
0687     long data_len = MAX_CALL_DATA_SIZE;
0688     tp_command_call_t *call;
0689     time_t timestamp;
0690     char send_qstr[NDRX_MAX_Q_SIZE+1];
0691     char reply_qstr[NDRX_MAX_Q_SIZE+1]; /* special one for conversation */
0692     char their_qstr[NDRX_MAX_Q_SIZE+1]; /* Their Q (could be bridge service) */
0693     long revent = 0;
0694     short command_id=ATMI_COMMAND_CONNECT;
0695     tp_conversation_control_t *conv;
0696     int is_bridge;
0697     int err;
0698     int prio = NDRX_MSGPRIO_DEFAULT;
0699     char svcddr[XATMI_SERVICE_NAME_LENGTH+1]; /**< routed service name */
0700     ATMI_TLS_ENTRY;
0701     
0702     NDRX_LOG(log_debug, "%s: called", __func__);
0703     
0704     NDRX_STRCPY_SAFE(svcddr, svc);
0705     /* try the DDR */
0706     if (EXFAIL==ndrx_ddr_grp_get(svcddr, sizeof(svcddr), data, len,
0707         &prio))
0708     {
0709         /* error shall be set */
0710         EXFAIL_OUT(ret);
0711     }
0712     
0713     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0714     call = (tp_command_call_t *)buf;
0715 
0716     /* Check service availability */
0717     if (EXSUCCEED!=ndrx_shm_get_svc(svcddr, send_qstr, &is_bridge, NULL))
0718     {
0719         NDRX_LOG(log_error, "Service is not available %s by shm", 
0720                 svcddr);
0721         ret=EXFAIL;
0722         ndrx_TPset_error_fmt(TPENOENT, "%s: Service is not available %s by shm", 
0723                  __func__, svcddr);
0724         goto out;
0725     }
0726     
0727     /* Get free connection */
0728     if (EXFAIL==(cd=conv_get_cd(flags)))
0729     {
0730         ndrx_TPset_error_msg(TPELIMIT, "Could not get free connection descriptor");
0731         ret=EXFAIL;
0732         goto out;
0733     }
0734 
0735     conv = &G_atmi_tls->G_tp_conversation_status[cd%MAX_CONNECTIONS];
0736     /* Hmm setup cd? */
0737     conv->cd = cd;
0738     /* reset queues... */
0739     conv->my_listen_q = (mqd_t)EXFAIL;
0740     conv->reply_q = (mqd_t)EXFAIL;
0741     conv->reply_q_str[0]=EXEOS;
0742     memset(call, 0, sizeof(*call));
0743 
0744     /* Bug #300 */
0745     call->clttout = G_atmi_env.time_out;
0746     
0747     /*
0748      * Prepare some data if have something to prepare
0749      */
0750 
0751     /* prepare buffer for call */
0752     if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, call->data, 
0753             &data_len, flags, 0))
0754     {
0755         /* not good - error should be already set */
0756         ret=EXFAIL;
0757         goto out;
0758     }
0759     /* OK, now fill up the details */
0760     call->data_len = data_len;
0761     data_len+=sizeof(tp_command_call_t);
0762     
0763 
0764     /* Format the conversational reply queue */
0765     snprintf(reply_qstr, sizeof(reply_qstr), NDRX_CONV_INITATOR_Q, 
0766             G_atmi_tls->G_atmi_conf.q_prefix,  G_atmi_tls->G_atmi_conf.my_id, cd);
0767     
0768     NDRX_LOG(log_debug, "%s/%s/%d reply_qstr: [%s]",
0769         G_atmi_tls->G_atmi_conf.q_prefix,  G_atmi_tls->G_atmi_conf.my_id, 
0770                 cd, reply_qstr);
0771     NDRX_STRCPY_SAFE(call->reply_to, reply_qstr);
0772 
0773     /* TODO: Firstly we should open the queue on which to listen right? */
0774     if ((mqd_t)EXFAIL==(conv->my_listen_q =
0775                     open_conv_q(reply_qstr, &conv->my_q_attr)))
0776     {
0777         NDRX_LOG(log_error, "%s: Failed to open listen queue", __func__);
0778         ret=EXFAIL;
0779         goto out;
0780     }
0781 
0782     NDRX_STRCPY_SAFE(conv->my_listen_q_str, reply_qstr);
0783 
0784     call->command_id = ATMI_COMMAND_CONNECT;
0785 
0786     NDRX_STRCPY_SAFE(call->name, svcddr);
0787     
0788     call->flags = flags | TPCONV; /* This is conversational call... */
0789     /* Prepare role flags */
0790     
0791     NDRX_STRCPY_SAFE(call->my_id, G_atmi_tls->G_atmi_conf.my_id);
0792     
0793     /* TODO: lock call descriptor - possibly add to connection descriptor structs */
0794     timestamp = time(NULL);
0795     call->cd = cd;
0796     /* Generate flags for target now. */
0797     CONV_TARGET_FLAGS(call);
0798     call->timestamp = timestamp;
0799     call->callseq = ndrx_get_next_callseq_shared();
0800     call->msgseq = NDRX_CONF_MSGSEQ_START; /* start with NDRX_CONF_MSGSEQ_START */
0801     
0802     /* Add global transaction info to call (if needed & tx available) */
0803     if (!(call->flags & TPNOTRAN) && G_atmi_tls->G_atmi_xa_curtx.txinfo)
0804     {
0805         NDRX_LOG(log_info, "Current process in global transaction (%s) - "
0806                 "prepare call", G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
0807         
0808         atmi_xa_cpy_xai_to_call(call, G_atmi_tls->G_atmi_xa_curtx.txinfo);   
0809     }
0810     /* Reset call timer...! */
0811     ndrx_stopwatch_reset(&call->timer);
0812 
0813     /*
0814     NDRX_LOG(log_debug, "Sending request to: %s, callseq: %u", 
0815       
0816      *       send_qstr, call->callseq, );
0817 */
0818     NDRX_LOG(log_debug, "Sending request to: [%s]: "
0819                         "cd: %d, timestamp :%d, callseq: %u, reply from [%s]",
0820                         send_qstr, call->cd, call->timestamp, call->callseq, call->reply_to);
0821     /* And then we call out the service. */
0822     if (EXSUCCEED!=(ret=ndrx_generic_q_send(send_qstr, (char *)call, data_len, flags, prio)))
0823     {
0824         int err;
0825         
0826         if (ENOENT==ret)
0827         {
0828             err=TPENOENT;
0829         }
0830         else
0831         {
0832             CONV_ERROR_CODE(ret, err);
0833         }
0834 
0835         ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s",  __func__, strerror(ret));
0836         ret=EXFAIL;
0837 
0838         goto out;
0839     }
0840     else
0841     {
0842         conv->status = CONV_IN_CONVERSATION;
0843         conv->timestamp = timestamp;
0844         conv->callseq = call->callseq;
0845         conv->msgseqout = call->msgseq;
0846         conv->msgseqin = NDRX_CONF_MSGSEQ_START; /* start with 1 */
0847 
0848         /* Save the flags, alright? */
0849         conv->flags |= (flags & TPSENDONLY);
0850         conv->flags |= (flags & TPRECVONLY);
0851         
0852     }
0853 
0854     /* So now, we shall receive back handshake, by receiving private queue 
0855      * id on which we can reach the server!
0856      */
0857     if (EXSUCCEED!=ndrx_tprecv(cd, (char **)&queuebuf, &data_len, 0L, &revent, 
0858             &command_id, EXFALSE))
0859     {
0860         /* should' error be already set? */
0861         EXFAIL_OUT(ret);
0862     }
0863     
0864     /* We should have */
0865     if (ATMI_COMMAND_CONNRPLY!=command_id)
0866     {
0867         ndrx_TPset_error_fmt(TPESYSTEM, "%s: Invalid connect handshake reply %d", 
0868                              __func__, command_id);
0869         ret=EXFAIL;
0870         goto out;
0871     }
0872     
0873     /* EOS is already included, where q name is set by ndrx_tpsend() */ 
0874     NDRX_STRCPY_SAFE(conv->reply_q_str, queuebuf);
0875     if (is_bridge)
0876     {
0877         NDRX_LOG(log_warn, "Service is bridge");
0878         NDRX_STRCPY_SAFE(their_qstr, send_qstr);
0879     }
0880     else
0881     {
0882         NDRX_STRCPY_SAFE(their_qstr, conv->reply_q_str);
0883     }
0884     
0885     NDRX_LOG(log_debug, "Got reply queue for conversation: [%s] - trying to open [%s]",
0886                                     conv->reply_q_str, their_qstr);
0887     
0888     if ((mqd_t)EXFAIL==(conv->reply_q=open_reply_q(their_qstr, &conv->reply_q_attr)))
0889     {
0890         NDRX_LOG(log_error, "Cannot establish connection - failed to "
0891                                     "open reply queue [%s]", conv->reply_q_str);
0892         ret=EXFAIL;
0893         goto out;
0894     }
0895 
0896     /* Put down the flag that we have open connection */
0897     if (EXSUCCEED==ret)
0898     {
0899         M_had_open_con = EXTRUE;
0900     }
0901 
0902     conv->handshaked=EXTRUE;
0903     
0904 out:
0905                                 
0906     if (NULL!=buf)
0907     {
0908         NDRX_SYSBUF_FREE(buf);
0909     }
0910 
0911     if (NULL!=queuebuf)
0912     {
0913         tpfree(queuebuf);
0914     }
0915 
0916     /* Kill conversation if FAILED!!!! */
0917     if (cd!=EXFAIL && EXFAIL==ret)
0918     {
0919         err=tperrno;
0920         ndrx_tpdiscon(cd);
0921         tperrno=err;
0922     }
0923 
0924     NDRX_LOG(log_debug, "%s: ret= %d cd=%d",  __func__, ret, cd);
0925 
0926     if (EXFAIL!=ret)
0927         return cd;
0928     else
0929         return ret;
0930 }
0931 
0932 /**
0933  * Add message to hash
0934  * If remove message if exists...
0935  * @param conv
0936  * @param msgseq
0937  * @param buf
0938  * @return 
0939  */
0940 exprivate int rcv_hash_add(tp_conversation_control_t *conv,
0941            unsigned msgseq, char *buf)
0942 {
0943     
0944     int ret = EXSUCCEED;
0945     char *tmp;
0946     tpconv_buffer_t * el = NDRX_FPMALLOC(sizeof(tpconv_buffer_t), 0);
0947     
0948     if (NULL!=(tmp=rcv_hash_ck(conv, msgseq)))
0949     {
0950         NDRX_LOG(log_error, "Dropping existing out of order conversation "
0951                 "msgseq: %u, ptr: %p",
0952                 msgseq, tmp);
0953         userlog("Dropping existing out of order conversation "
0954                 "msgseq: %u, ptr: %p",
0955                 msgseq, tmp);
0956         NDRX_FREE(tmp);
0957     }
0958     
0959     if (NULL==el)
0960     {
0961         ndrx_TPset_error_fmt(TPESYSTEM, "%s: Failed to allocate mem: %s", 
0962                 __func__, strerror(errno));
0963         EXFAIL_OUT(ret);
0964     }
0965     el->size=0;
0966     el->msgseq = msgseq;
0967     el->buf = buf;
0968     
0969     EXHASH_ADD_INT( conv->out_of_order_msgs, msgseq, el );
0970     
0971 out:
0972                                 
0973     return ret;
0974 }
0975 
0976 /**
0977  * Check the message within hash, if found, remove hash entry
0978  * @param conv
0979  * @param msgseq
0980  * @return NULL if item not found, buffer if found
0981  */
0982 exprivate char * rcv_hash_ck(tp_conversation_control_t *conv, unsigned msgseq)
0983 {
0984     char *ret = NULL;
0985     tpconv_buffer_t * el;
0986     unsigned seq =  msgseq;
0987     
0988     EXHASH_FIND_INT( conv->out_of_order_msgs, &seq, el);
0989     
0990     if (NULL!=el)
0991     {
0992         ret = el->buf;
0993         EXHASH_DEL(conv->out_of_order_msgs, el);
0994         NDRX_FPFREE(el);
0995     }
0996     
0997     return ret;
0998 }
0999 
1000 /**
1001  * Delete all hash
1002  * @param conv
1003  * @param msgseq
1004  * @return 
1005  */
1006 exprivate void rcv_hash_delall(tp_conversation_control_t *conv)
1007 {
1008     tpconv_buffer_t *el = NULL;
1009     tpconv_buffer_t *elt = NULL;
1010     
1011     /* Iterate over the hash! */
1012     EXHASH_ITER(hh, conv->out_of_order_msgs, el, elt)
1013     {
1014         EXHASH_DEL(conv->out_of_order_msgs, el);
1015         NDRX_SYSBUF_FREE(el->buf); /* dealloc system buffer */
1016         NDRX_FPFREE(el); /* free hash item */
1017     }
1018     
1019 }
1020 
1021 /**
1022  * Receive message from conversation
1023  * TODO: Add stopwatch for timeout...
1024  * 
1025  * @param cd
1026  * @param data
1027  * @param len
1028  * @param flags
1029  * @param revent
1030  * @param ign_blkerr Ignore blocking error (do not add to logs)
1031  * @return
1032  */
1033 expublic int ndrx_tprecv (int cd, char **data, 
1034                         long *len, long flags, long *revent,
1035                         short *command_id, int ign_blkerr)
1036 {
1037     int ret=EXSUCCEED;
1038     ssize_t rply_len;
1039     unsigned prio;
1040     size_t rply_bufsz;
1041     char *rply_buf = NULL; /* Allocate dynamically! */
1042     tp_command_call_t *rply;
1043     int answ_ok = EXFALSE;
1044     long len_int = 0;
1045     tp_conversation_control_t *conv;
1046     ndrx_stopwatch_t t;
1047     ATMI_TLS_ENTRY;
1048     NDRX_LOG(log_debug, "%s enter", __func__);
1049 
1050     /* reset return code ...
1051     G_atmi_tls->M_svc_return_code=0;
1052     ?
1053     */
1054 
1055     /* Enduro/X allows len to be NULL */
1056     if (NULL==len)
1057     {
1058         len = &len_int;
1059     }
1060     *revent = 0;
1061     
1062     if (!(flags & TPNOTIME))
1063     {
1064         ndrx_stopwatch_reset(&t);
1065     }
1066     
1067     /* choose the connection */
1068     if (NULL==(conv=get_current_connection(cd)))
1069     {
1070         ndrx_TPset_error_fmt(TPEINVAL, "%s: Invalid connection descriptor %d",  
1071                 __func__, cd);
1072         EXFAIL_OUT(ret);
1073     }
1074 
1075     /* Check are we allowed to receive? */
1076     if (ATMI_COMMAND_CONVDATA==*command_id && conv->flags & TPSENDONLY)
1077     {
1078         ndrx_TPset_error_fmt(TPEPROTO, "%s: Not allowed to receive "
1079                                     "because flags say: TPSENDONLY!", __func__);
1080         EXFAIL_OUT(ret);
1081     }
1082 
1083     /* Change the attributes of the queue to match required */
1084     if (EXSUCCEED!=ndrx_setup_queue_attrs(&conv->my_q_attr, conv->my_listen_q,
1085                                     conv->my_listen_q_str, flags))
1086     {
1087         EXFAIL_OUT(ret);
1088     }
1089     
1090     /* Check the message in hash?! */
1091     if (NULL!=(rply_buf = rcv_hash_ck(conv, conv->msgseqin)))
1092     {
1093         NDRX_LOG(log_info, "Message with sequence already in memory: %u - injecting",
1094                 conv->msgseqin);
1095         rply = (tp_command_call_t *)rply_buf;
1096         goto inject_message;
1097     }
1098 
1099     /* Allocate Enduro/X system buffer */
1100     NDRX_SYSBUF_MALLOC_WERR_OUT(rply_buf, rply_bufsz, ret);
1101     rply = (tp_command_call_t *)rply_buf;
1102     
1103     /* TODO: If we keep linked list with call descriptors and if there is
1104      * none, then we should return something back - FAIL/proto, not? */
1105 
1106     /**
1107      * We will drop any answers not registered for this call
1108      */
1109     while (!answ_ok)
1110     {
1111         long spent;
1112         if (!(flags & TPNOTIME) && 
1113                 (spent=ndrx_stopwatch_get_delta_sec(&t)) > G_atmi_env.time_out)
1114         {
1115             NDRX_LOG(log_error, "%s: call expired (spent: %ld sec, tout: %ld sec)", 
1116                     __func__, spent, G_atmi_env.time_out);
1117             
1118             ndrx_TPset_error_fmt(TPETIME, "%s: call expired (spent: %ld sec, tout: %ld sec)", 
1119                     __func__, spent, G_atmi_env.time_out);
1120             
1121             EXFAIL_OUT(ret);
1122         }
1123 
1124         /* receive the reply back */
1125         rply_len = ndrx_generic_q_receive(conv->my_listen_q, NULL, NULL, 
1126                 rply_buf, rply_bufsz, &prio, flags);
1127 
1128         
1129         if (GEN_QUEUE_ERR_NO_DATA==rply_len)
1130         {
1131             /* there is no data in reply, nothing to do & nothing to return */
1132             /* how about TPEBLOCK ? */
1133             
1134             if (!ign_blkerr)
1135             {
1136                 /* Generate blocking error */
1137                 NDRX_LOG(log_info, "TPENOBLOCK was specified in flags and "
1138                     "no message is in queue");
1139                 ndrx_TPset_error_msg(TPEBLOCK, "TPENOBLOCK was specified in flags and "
1140                     "no message is in queue");
1141             }
1142             
1143             EXFAIL_OUT(ret);
1144         }
1145         else if (EXFAIL==rply_len)
1146         {
1147             /* we have failed */
1148             NDRX_LOG(log_error, "%s failed to receive answer (%s)", __func__, conv->my_listen_q_str);
1149             ret=EXFAIL;
1150             goto out;
1151         }
1152         else
1153         {
1154             /* if answer is not expected, then we receive again! */
1155             if (conv->cd!=rply->cd)
1156             {
1157                 NDRX_LOG(log_warn, "Dropping incoming message (not expected): "
1158                         "expected cd: %d, cd: %d, timestamp :%d, callseq: %u, reply from [%s]",
1159                         conv->cd, rply->cd, rply->timestamp, rply->callseq, rply->reply_to);
1160                 /* clear the attributes we got... 
1161                 memset(rply_buf, 0, sizeof(*rply_buf));
1162                  * Really need a memset?
1163                  * */
1164                 continue; /* Wait for next message! */
1165             }
1166             
1167 inject_message:
1168             /* we have an answer - prepare buffer */
1169             /* Check the sequence number */
1170             if (rply->msgseq!=conv->msgseqin)
1171             {
1172                 answ_ok=EXFALSE;
1173                 NDRX_LOG(log_info, "Message out of sequence, expected: %u, "
1174                         "got: %hu - suspending to hash",
1175                         conv->msgseqin, rply->msgseq);
1176                 
1177                 /* TODO: Add reply message to the hash of the waiting msgs... */
1178                 if (EXSUCCEED!=rcv_hash_add(conv, rply->msgseq, rply_buf))
1179                 {
1180                     EXFAIL_OUT(ret);
1181                 }
1182                 
1183                 /* Realloc system buffer */
1184                 NDRX_SYSBUF_MALLOC_WERR_OUT(rply_buf, rply_bufsz, ret);
1185                 /* switch the ptrs... */
1186                 rply = (tp_command_call_t *)rply_buf;
1187                 
1188                 continue;
1189             }
1190             else
1191             {
1192                 answ_ok=EXTRUE;
1193                 conv->msgseqin++;
1194                 NDRX_LOG(log_info, "msgseq %u received as expected", 
1195                         rply->msgseq);
1196             }
1197 
1198             *command_id=rply->command_id;
1199 
1200             /* Save the last return codes */
1201             conv->rcode=rply->rcode;
1202             conv->rval=rply->rval;
1203 
1204             if (rply->sysflags & SYS_FLAG_REPLY_ERROR)
1205             {
1206                 if (rply->rcode==TPESVCERR)
1207                 {
1208                     conv->revent = *revent = TPEV_SVCERR; /* Server failed. */
1209                     ndrx_TPset_error(TPEEVENT); /* We have event! */
1210                 }
1211                 else
1212                 {
1213                     ndrx_TPset_error_msg(rply->rcode, "Server failed to generate reply");
1214                     /*conv->revent = *revent = TPEV_SVCERR;  Server failed. should we? */
1215                 }
1216                 
1217                 /* Shutdown connection*/
1218                 normal_connection_shutdown(conv, EXTRUE, 
1219                         "tprecv got SYS_FLAG_REPLY_ERROR");
1220                 
1221                 ret=EXFAIL;
1222                 goto out;
1223             } /* Forced close received. */
1224             else if (ATMI_COMMAND_DISCONN==rply->command_id)
1225             {
1226                 conv->revent = *revent=TPEV_DISCONIMM;
1227                 if (EXFAIL==normal_connection_shutdown(conv, EXFALSE, 
1228                         "tprecv got ATMI_COMMAND_DISCONN"))
1229                 {
1230                     NDRX_LOG(log_error, "Failed to close conversation");
1231                     ret=EXFAIL;
1232                     goto out;
1233                 }
1234                 ret=EXFAIL; /* anyway! */
1235                 ndrx_TPset_error(TPEEVENT); /* We have event! */
1236             }
1237             else
1238             {
1239                 /*ATMI_COMMAND_CONNRPLY:
1240                  *  We have reply queue already in
1241                  * This is really used only for handshaking internally
1242                  * so we do not use buffer management here!
1243                  */
1244                 ret=ndrx_mbuf_prepare_incoming(rply->data,
1245                                 rply->data_len,
1246                                 data,
1247                                 len,
1248                                 flags, 0L);
1249 
1250                 /* TODO: Check buffer acceptance or do it inside of prepare_incoming? */
1251                 if (ret==EXFAIL)
1252                 {
1253                     goto out;
1254                 }
1255                 
1256                 /* Normal connection shutdown */
1257                 if (ATMI_COMMAND_TPREPLY==rply->command_id)
1258                 {
1259                     /* Basically we close the connection (normally)
1260                      * This means we also needs to close & unlink queues
1261                      */
1262                     NDRX_LOG(log_info, "Server did tpreturn - closing conversation!");
1263 
1264                     /* Save the rcode returned. */
1265                     G_atmi_tls->M_svc_return_code = rply->rcode;
1266 
1267                     if (TPSUCCESS!=rply->rval)
1268                     {
1269                         conv->revent = *revent = TPEV_SVCFAIL;
1270                     }
1271                     else
1272                     {
1273                         conv->revent = *revent = TPEV_SVCSUCC;
1274                     }
1275 
1276                     /*
1277                      * Gracefully shutdown the connection
1278                      */
1279                     if (EXSUCCEED!=normal_connection_shutdown(conv, EXTRUE, 
1280                             "tprecv got TPREPLY"))
1281                     {
1282                         ret=EXFAIL;
1283                         goto out;
1284                     }
1285                     ret=EXFAIL; /* anyway! */
1286                     ndrx_TPset_error(TPEEVENT); /* We have event! */
1287                     goto out;
1288                 }
1289 
1290                 if (rply->flags & TPSENDONLY)
1291                 {
1292                     NDRX_LOG(log_debug, "Sender program issued TPRECVONLY "
1293                                     "- so we become TPSENDONLY!");
1294                     ret=EXFAIL;
1295                     conv->revent = *revent = TPEV_SENDONLY;
1296 
1297                     ndrx_TPset_error_fmt(TPEEVENT, "Got event TPEV_SENDONLY");
1298                     
1299                     /* Set our conversation as senders */
1300                     conv->flags|=TPSENDONLY;
1301                     conv->flags&=~TPRECVONLY;
1302                 }
1303             } /* else - normal reply/data from send! */
1304         } /* reply received ok */
1305     }
1306 out:
1307     NDRX_LOG(log_debug, "%s return %d",  __func__, ret);
1308 
1309     if (G_atmi_tls->G_atmi_xa_curtx.txinfo)
1310     {
1311         if ( (TPEV_DISCONIMM == *revent ||  TPEV_SVCERR == *revent ||  
1312                 TPEV_SVCFAIL == *revent) && 
1313                 /* Only if not already aborted. */
1314                 !(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmtxflags & TMTXFLAGS_IS_ABORT_ONLY)
1315             )
1316         {
1317              NDRX_LOG(log_warn, "tprcv error (revent=%ld) - mark "
1318                      "transaction as abort only!", *revent);
1319             /* later should be handled by transaction initiator! */
1320             G_atmi_tls->G_atmi_xa_curtx.txinfo->tmtxflags |= TMTXFLAGS_IS_ABORT_ONLY;
1321         }
1322         /* If we got any reply, and it contains abort only flag
1323          * (for current transaction, then we abort it...)
1324          */
1325         if (0==strcmp(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid, rply->tmxid))
1326                 
1327         {
1328             if (rply->tmtxflags & TMTXFLAGS_IS_ABORT_ONLY &&
1329                     !(G_atmi_tls->G_atmi_xa_curtx.txinfo->tmtxflags & TMTXFLAGS_IS_ABORT_ONLY)
1330                     )
1331             {
1332                 NDRX_LOG(log_warn, "Mark transaction as abort only from reply!");
1333                 G_atmi_tls->G_atmi_xa_curtx.txinfo->tmtxflags |= TMTXFLAGS_IS_ABORT_ONLY;
1334             }
1335             
1336             /* Update known RMs */
1337             if ( !(rply->tmtxflags & TMTXFLAGS_IS_ABORT_ONLY) &&
1338                     EXEOS!=rply->tmknownrms[0] &&
1339                     EXSUCCEED!=atmi_xa_update_known_rms(
1340                         G_atmi_tls->G_atmi_xa_curtx.txinfo->tmknownrms, 
1341                         rply->tmknownrms))
1342             {
1343                 NDRX_LOG(log_warn, "Failed to update known RMS: mark "
1344                      "transaction as abort only!");
1345                 G_atmi_tls->G_atmi_xa_curtx.txinfo->tmtxflags |= TMTXFLAGS_IS_ABORT_ONLY;
1346                 ret=EXFAIL;
1347             }
1348         }   
1349     }
1350 
1351     /* Bug #389 */
1352     if (NULL!=rply_buf)
1353     {
1354         NDRX_SYSBUF_FREE(rply_buf);
1355     }
1356     
1357     return ret;
1358 }
1359 
1360 /**
1361  * Receive any un-expected messages.
1362  * With this we catch the cases when server have been illegally made tpreturn!
1363  * @param cd current conversation descriptor
1364  * @param p_revent event value returned in case of if 
1365  * @return EXSUCCEED/EXFAIL (event received)
1366  */
1367 exprivate int process_unsolicited_messages(int cd, long *p_revent)
1368 {
1369     short command_id=ATMI_COMMAND_CONNUNSOL;
1370     char *data=NULL;
1371     long len;
1372     long revent=0;
1373     int ret = EXSUCCEED;
1374     
1375     /* Flush down all messages 
1376      * here we ignore TPEBLOCK error
1377      * not sure is it worth to set it up.
1378      */
1379     while (EXSUCCEED==ndrx_tprecv (cd, &data, &len, TPNOBLOCK, 
1380             &revent, &command_id, EXTRUE))
1381     {
1382         NDRX_LOG(log_debug, "Ignoring unsolicited message!");
1383         /* Free up data */
1384         tpfree(data);
1385         data=NULL;
1386     }
1387     
1388     /* no problem if they want to send... ! */
1389     if (TPEEVENT==tperrno && TPEV_SENDONLY!=revent)
1390     {
1391         *p_revent=revent;
1392         ret=EXFAIL;
1393     }
1394     
1395     /* finally if something after the buffer alloc failed */
1396     if (NULL!=data)
1397     {
1398         ndrx_tpfree (data, NULL);
1399     }
1400     
1401     return ret;
1402     
1403 }
1404 
1405 /**
1406  * Send the message to conversation
1407  * @param cd
1408  * @param data
1409  * @param len
1410  * @param flags
1411  * @param revent
1412  * @param command_id
1413  * @return 
1414  */
1415 expublic int ndrx_tpsend (int cd, char *data, long len, long flags, long *revent,
1416                             short command_id)
1417 {
1418     int ret=EXSUCCEED;
1419     char *buf=NULL;
1420     char *data_q = NULL;
1421     size_t buf_len;
1422     long data_len = MAX_CALL_DATA_SIZE;
1423     tp_command_call_t *call;
1424     tp_conversation_control_t *conv;
1425     
1426     ATMI_TLS_ENTRY;
1427 
1428     /* reset return code ...
1429      * ?
1430     G_atmi_tls->M_svc_return_code=0;
1431     */
1432 
1433     NDRX_LOG(log_debug, "%s: called", __func__);
1434     *revent = 0;
1435 
1436     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
1437     call = (tp_command_call_t *)buf;
1438     
1439     /* Check the current connection descriptor */
1440     if (NULL==(conv=get_current_connection(cd)))
1441     {
1442         ndrx_TPset_error_fmt(TPEINVAL, "%s: Invalid connection descriptor %d",  __func__, cd);
1443         EXFAIL_OUT(ret);
1444     }
1445 
1446     /* Check are we allowed to receive? */
1447     if (ATMI_COMMAND_CONVDATA==command_id && conv->flags & TPRECVONLY)
1448     {
1449         ret=EXFAIL;
1450         ndrx_TPset_error_fmt(TPEPROTO, "%s: Not allowed to receive "
1451                                     "because flags say: TPRECVONLY!", __func__);
1452         goto out;
1453     }
1454     
1455     /* Manage the flags for active connection */
1456     if (flags & TPRECVONLY)
1457     {
1458         NDRX_LOG(log_debug, "%s: Program issued TPRECVONLY", __func__);
1459         /* Set our conversation as senders */
1460         conv->flags|=TPRECVONLY;
1461         conv->flags&=~TPSENDONLY;
1462     }
1463 
1464     memset(call, 0, sizeof(*call));
1465 
1466     /* Change the mode in which we run.\
1467      * We may receive message in async mode.
1468      */
1469     if (EXSUCCEED!=ndrx_setup_queue_attrs(&conv->reply_q_attr, conv->reply_q,
1470                                     conv->reply_q_str, flags))
1471     {
1472         ret=EXFAIL;
1473         goto out;
1474     }
1475 
1476     /* Check for any messages & process them
1477      * - if we get any other error than TPEBLOCK
1478      *   then error shall be returned together with revent.
1479      *   how about changing the 
1480      */
1481     if (EXSUCCEED!=process_unsolicited_messages(cd, revent))
1482     {
1483         ret=EXFAIL;
1484         goto out;
1485     }
1486     
1487     /*
1488      * Check are we still in connection?
1489      */
1490     if (CONV_IN_CONVERSATION!=conv->status)
1491     {
1492         ret=EXFAIL;
1493 
1494         /* If it exited with SUCCEED, then it is ERR. */
1495         if (conv->revent == TPEV_SVCSUCC)
1496         {
1497             *revent=TPEV_SVCERR;
1498         }
1499         else /* else we publish the event back (disconnect or SVCFAIL) */
1500         {
1501             *revent=conv->revent;
1502         }
1503 
1504         NDRX_LOG(log_error, "Un-solicited disconnect from server of "
1505                                 "cd %d. Returning event %ld", cd, *revent);
1506 
1507         /* close our listening queue */
1508         normal_connection_shutdown(conv, EXFALSE, "tpsend got closed conversation");
1509         ret=EXFAIL;
1510         ndrx_TPset_error(TPEEVENT); /* Set that we have event for caller! */
1511         goto out;
1512     }
1513     
1514     /*
1515      * Prepare some data if have something to prepare
1516      */
1517     if (ATMI_COMMAND_CONNRPLY==command_id)
1518     {
1519 #if 0
1520         /* TODO: Prepare string outgoing ... . */
1521         /* Indicate that this is string buffer! */
1522         strcpy(call->data, conv->my_listen_q_str);
1523         data_len = strlen(call->data) + 1; /* Include EOS... */
1524 #endif   
1525         len = strlen(conv->my_listen_q_str) + 1;
1526         data_q=data=tpalloc("STRING", NULL, len);
1527         
1528         if (NULL==data)
1529         {
1530             NDRX_LOG(log_error, "Failed to alloc: %s", tpstrerror(tperrno));
1531             EXFAIL_OUT(ret);
1532         }
1533         
1534         strcpy(data, conv->my_listen_q_str);
1535         
1536     }
1537     
1538     /* prepare buffer for call */
1539     if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, call->data, 
1540             &data_len, flags, 0L))
1541     {
1542         /* not good - error should be already set */
1543         EXFAIL_OUT(ret);
1544     }
1545    
1546 
1547     /* OK, now fill up the details */
1548     call->data_len = data_len;
1549 
1550     data_len+=sizeof(tp_command_call_t);
1551 
1552     call->callseq = conv->callseq;
1553     call->msgseq = conv->msgseqout;
1554     
1555     
1556     /* So here is little trick for bridge.
1557      * As it needs to know where reply should go, we should leave here original caller
1558      * reply address, but pack in different my_listen_q_str
1559      */
1560     NDRX_LOG(log_debug, "Our address is: [%s], their reply address must be: [%s]. "
1561             "Callseq: %u, msgseq: %u",
1562             conv->my_listen_q_str, conv->reply_q_str, 
1563             call->callseq, call->msgseq);
1564     
1565     NDRX_STRCPY_SAFE(call->reply_to, conv->reply_q_str);
1566      
1567     /*
1568     strcpy(call->reply_to, conv->my_listen_q_str);
1569     */
1570 
1571     call->command_id = command_id;
1572     call->flags = flags;
1573     
1574     /* set call expiry? */
1575     call->clttout = G_atmi_env.time_out;
1576     ndrx_stopwatch_reset(&call->timer);
1577     
1578     /* Fix up send/receive flags */
1579     CONV_TARGET_FLAGS(call);
1580 
1581     /* TODO: lock call descriptor - possibly add to connection descriptor structs */
1582     call->cd = conv->cd;
1583 
1584     /* Fix connection thing */
1585     if (call->cd>=NDRX_CONV_UPPER_CNT)
1586     {
1587         call->cd-=NDRX_CONV_UPPER_CNT;
1588     }
1589 
1590     call->timestamp = conv->timestamp;
1591     
1592     /* Add global transaction info for sending... */
1593     if (G_atmi_tls->G_atmi_xa_curtx.txinfo)
1594     {
1595         NDRX_LOG(log_info, "Current process in global transaction (%s) - "
1596                 "prepare call", G_atmi_tls->G_atmi_xa_curtx.txinfo->tmxid);
1597         
1598         atmi_xa_cpy_xai_to_call(call, G_atmi_tls->G_atmi_xa_curtx.txinfo);
1599     }
1600     
1601     /* And then we call out the service. */
1602     if (EXSUCCEED!=(ret=ndrx_generic_qfd_send(conv->reply_q, (char *)call, data_len, flags)))
1603     {
1604         int err;
1605 
1606         if (ENOENT==ret)
1607         {
1608             err=TPENOENT;
1609         }
1610         else
1611         {
1612             CONV_ERROR_CODE(ret, err);
1613         }
1614 
1615         ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s",  __func__, strerror(ret));
1616         ret=EXFAIL;
1617 
1618         goto out;
1619     }
1620     else
1621     {
1622         /* schedule next delivery */
1623         conv->msgseqout++;
1624     }
1625 
1626 out:
1627                                 
1628     if (NULL!=buf)
1629     {
1630         NDRX_SYSBUF_FREE(buf);
1631     }
1632 
1633     if (NULL!=data_q)
1634     {
1635         atmi_error_t err;
1636         ndrx_TPsave_error(&err);
1637         tpfree(data_q);
1638         ndrx_TPrestore_error(&err);
1639     }
1640 
1641     /* TODO: Kill conversation if FAILED!!!! */
1642     NDRX_LOG(log_debug, "%s: return %d",  __func__, ret);
1643     return ret;
1644 }
1645 
1646 
1647 /**
1648  * Close connection immediately
1649  * @param cd
1650  * @return SUCCEED/FAIL
1651  */
1652 expublic int ndrx_tpdiscon (int cd)
1653 {
1654     int ret=EXSUCCEED;
1655     long revent;
1656     tp_conversation_control_t *conv;
1657     
1658     /* Check our role */
1659     if (NULL==(conv=get_current_connection(cd)))
1660     {
1661         ndrx_TPset_error_fmt(TPEINVAL, "%s: Invalid connection descriptor %d",  __func__, cd);
1662         ret=EXFAIL;
1663         goto out;
1664     }
1665 
1666     /* Send disconnect command to server */
1667     if ((mqd_t)EXFAIL!=conv->reply_q && EXFAIL==ndrx_tpsend (cd, NULL, 0L, 0L, 
1668             &revent, ATMI_COMMAND_DISCONN))
1669     {
1670         NDRX_LOG(log_debug, "Failed to send disconnect to server - IGNORE!");
1671     }
1672 
1673     /* Close down then connection (We close down this only if we are server!)*/
1674     
1675     /* default to disconn...? */
1676     
1677     conv->revent =TPEV_DISCONIMM;
1678     if (EXFAIL==normal_connection_shutdown(conv, EXTRUE, "tpdiscon called"))
1679     {
1680         ret=EXFAIL;
1681         goto out;
1682     }
1683     
1684 out:
1685     NDRX_LOG(log_warn, "%s: return %d",  __func__, ret);
1686     return ret;
1687 }
1688 
1689 /* vim: set ts=4 sw=4 et smartindent: */