Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Send notification message to client
0003  *
0004  * @file tpnotify.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 #include <stdio.h>
0037 #include <stdarg.h>
0038 #include <memory.h>
0039 #include <stdlib.h>
0040 #include <errno.h>
0041 #include <sys_mqueue.h>
0042 #include <fcntl.h>
0043 
0044 #include <atmi.h>
0045 #include <ndebug.h>
0046 #include <tperror.h>
0047 #include <typed_buf.h>
0048 #include <atmi_int.h>
0049 
0050 #include "../libatmisrv/srv_int.h"
0051 #include "userlog.h"
0052 #include "exregex.h"
0053 #include <thlock.h>
0054 #include <xa_cmn.h>
0055 #include <atmi_shm.h>
0056 #include <atmi_tls.h>
0057 #include <utlist.h>
0058 /*---------------------------Externs------------------------------------*/
0059 /*---------------------------Macros-------------------------------------*/
0060 /*---------------------------Enums--------------------------------------*/
0061 /*---------------------------Typedefs-----------------------------------*/
0062 /*---------------------------Globals------------------------------------*/
0063 /*---------------------------Statics------------------------------------*/
0064 /*---------------------------Prototypes---------------------------------*/
0065 
0066 /**
0067  * tpnotify & tpbroadcast core.
0068  * We will not support TPACK (currently) as it looks like seperate q
0069  * is needed for this and standard reply Q can cause some deadlocks...
0070  * 
0071  * @param clientid optional client id to send data to
0072  * @param p_clientid_myid parsed client id (target to send data to)
0073  * @param data XATMI allocated data or NULL
0074  * @param len data len
0075  * @param flags XATMI flags TPREGEXMATCH - do regexp match over the username, cltname, nodeid
0076  * @param dest_node If doing TPCALL_BRCALL then node id to deliver to
0077  * @param usrname string/regexp or NULL
0078  * @param cltname string/regexp or NULL
0079  * @param nodeid string/regexp or NULL
0080  * @param ex_flags So TPCALL_BROADCAST - when doing broadcast, TPCALL_BRCALL - call bridge
0081  * @return 
0082  */
0083 expublic int ndrx_tpnotify(CLIENTID *clientid, TPMYID *p_clientid_myid, 
0084         char *cltq, /* client q already built by broadcast */
0085         char *data, long len, long flags, 
0086         int dest_node, char *nodeid, char *usrname,  char *cltname,
0087         int ex_flags)
0088 {
0089     int ret=EXSUCCEED;
0090     char *buf=NULL;
0091     size_t buf_len;
0092     tp_notif_call_t *call;
0093     buffer_obj_t *buffer_info;
0094     long data_len = MAX_CALL_DATA_SIZE;
0095     char send_q[NDRX_MAX_Q_SIZE+1];
0096     time_t timestamp;
0097     int is_bridge;
0098     int tpcall_cd;
0099     long local_node = tpgetnodeid();
0100     ATMI_TLS_ENTRY;
0101     
0102     NDRX_LOG(log_debug, "%s enter", __func__);
0103 
0104     NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0105     call = (tp_notif_call_t *)buf;
0106             
0107     /* Might want to remove in future... but it might be dangerous!*/
0108     memset(call, 0, sizeof(tp_notif_call_t));
0109    
0110     /* Set the dest client id */
0111     if (NULL!=clientid)
0112     {
0113         NDRX_STRCPY_SAFE(call->destclient, clientid->clientdata);
0114     }
0115     
0116     call->destnodeid = 0;
0117     /* Check service availability by SHM? 
0118      */
0119     if (ex_flags & TPCALL_BRCALL)
0120     {
0121         call->destnodeid = dest_node;
0122         /* If this is a bridge call, then format the bridge Q */
0123 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0124         {
0125             int tmp_is_bridge;
0126             char tmpsvc[MAXTIDENT+1];
0127             
0128             snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, dest_node);
0129 
0130             if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, send_q, &tmp_is_bridge,
0131                 NULL))
0132             {
0133                 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]", 
0134                         tmpsvc);
0135                 EXFAIL_OUT(ret);
0136             }
0137         }
0138 #else
0139         snprintf(send_q, sizeof(send_q), NDRX_SVC_QBRDIGE, 
0140                 G_atmi_tls->G_atmi_conf.q_prefix, dest_node);
0141 #endif
0142         
0143         is_bridge=EXTRUE;
0144     }
0145     else if (NULL!=cltq)
0146     {
0147         /* We have Q build already */
0148         NDRX_STRCPY_SAFE(send_q, cltq);
0149     }
0150     else
0151     {
0152         /* we are sending to client directly thus setup the q properly... 
0153          * But client could resist on different cluster node
0154          * Thus we need to check their node...
0155          */
0156         
0157         if (EXSUCCEED!=ndrx_myid_convert_to_q(p_clientid_myid, send_q, 
0158                 sizeof(send_q)))
0159         {
0160             ndrx_TPset_error_fmt(TPEINVAL, "Failed to translate client data [%s] to Q", 
0161                     clientid->clientdata);
0162             EXFAIL_OUT(ret);
0163         }
0164         
0165         if (p_clientid_myid->nodeid!=local_node)
0166         {
0167             NDRX_LOG(log_info, "The client [%s] resists on another node [%ld], "
0168                     "thus send in cluster", clientid->clientdata, 
0169                     (long)p_clientid_myid->nodeid);
0170             
0171             /* Will do call in recursive way so that cluster code picks up
0172              * this dispatch...
0173              */
0174             ret = ndrx_tpnotify(clientid, p_clientid_myid, 
0175                         cltq, /* client q already built by broadcast */
0176                         data, len, flags, 
0177                         p_clientid_myid->nodeid, 
0178                         nodeid, usrname, cltname,ex_flags | TPCALL_BRCALL);
0179             /* avoid memory leak */
0180             goto out;
0181             
0182         }
0183     }
0184     
0185     if (NULL!=data)
0186     {
0187         if (NULL==(buffer_info = ndrx_find_buffer(data)))
0188         {
0189             ndrx_TPset_error_fmt(TPEINVAL, "Buffer %p not known to system!", __func__);
0190             EXFAIL_OUT(ret);
0191         }
0192     }
0193 
0194     /* prepare buffer for call */
0195     if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, call->data, 
0196             &data_len, flags, 0L))
0197     {
0198         /* not good - error should be already set */
0199         EXFAIL_OUT(ret);
0200     }
0201 
0202     /* OK, now fill up the details */
0203     call->data_len = data_len;
0204     
0205     data_len+=sizeof(tp_notif_call_t);
0206 
0207     NDRX_STRCPY_SAFE(call->reply_to, G_atmi_tls->G_atmi_conf.reply_q_str);
0208     
0209     /* If call to bridge, then it is broadcast... */
0210     if (ex_flags & TPCALL_BROADCAST)
0211     {
0212         call->command_id = ATMI_COMMAND_BROADCAST;
0213     }
0214     else
0215     {
0216         call->command_id = ATMI_COMMAND_TPNOTIFY;
0217     }
0218     
0219     call->flags = flags;
0220     timestamp = time(NULL);
0221     
0222     /* lock call descriptor */
0223     if (flags & TPACK)
0224     {
0225         NDRX_LOG(log_warn, "TPACK set but not supported. Ignoring...");
0226         tpcall_cd = 0;
0227     }
0228     else
0229     {
0230         NDRX_LOG(log_debug, "TPACK not set, => cd=0 - no reply needed");
0231         tpcall_cd = 0;
0232     }
0233     
0234     call->cd = tpcall_cd;
0235     call->timestamp = timestamp;
0236     
0237     if (NULL!=usrname)
0238     {
0239         NDRX_STRCPY_SAFE(call->usrname, usrname);
0240     }
0241     else
0242     {
0243         call->usrname[0] = EXEOS;
0244         call->usrname_isnull = EXTRUE;
0245     }
0246     
0247     if (NULL!=cltname)
0248     {
0249         NDRX_STRCPY_SAFE(call->cltname, cltname);
0250     }
0251     else
0252     {
0253         call->cltname[0] = EXEOS;
0254         call->cltname_isnull = EXTRUE;
0255     }
0256     
0257     if (NULL!=nodeid)
0258     {
0259         NDRX_STRCPY_SAFE(call->nodeid, nodeid);
0260     }
0261     else
0262     {
0263         call->nodeid[0] = EXEOS;
0264         call->nodeid_isnull = EXTRUE;
0265     }
0266     
0267     /* Reset call timer */
0268     ndrx_stopwatch_reset(&call->timer);
0269     
0270     NDRX_STRCPY_SAFE(call->my_id, G_atmi_tls->G_atmi_conf.my_id); /* Setup my_id */
0271     
0272     NDRX_LOG(log_debug, "Sending notification request to: [%s] my_id=[%s] "
0273             "reply_to=[%s] cd=%d callseq=%u", 
0274             send_q, call->my_id, call->reply_to, tpcall_cd, call->callseq);
0275     
0276     NDRX_DUMP(log_dump, "Sending away...", (char *)call, data_len);
0277 
0278     if (EXSUCCEED!=(ret=ndrx_generic_q_send(send_q, (char *)call, data_len, flags, 
0279             NDRX_MSGPRIO_NOTIFY)))
0280     {
0281         int err;
0282 
0283         if (ENOENT==ret)
0284         {
0285             err=TPENOENT;
0286         }
0287         else
0288         {
0289             CONV_ERROR_CODE(ret, err);
0290         }
0291         ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s", __func__, strerror(ret));
0292         EXFAIL_OUT(ret);
0293     }
0294     
0295 out:
0296     
0297     if (NULL!=buf)
0298     {
0299         NDRX_SYSBUF_FREE(buf);
0300     }
0301 
0302     NDRX_LOG(log_debug, "%s return %d", __func__, ret);
0303     return ret;
0304 }
0305 
0306 /**
0307  * Entry point when notification received...
0308  * @param buf buffer received (full) from Q
0309  * @param len buffer len
0310  */
0311 expublic void ndrx_process_notif(char *buf, ssize_t len)
0312 {
0313     int ret = EXSUCCEED;
0314     char *odata = NULL;
0315     long olen = 0;
0316     expublic buffer_obj_t * typed_buf = NULL;
0317     tp_notif_call_t *notif = (tp_notif_call_t *) buf;
0318     
0319     NDRX_LOG(log_debug, "%s: Got notification from: [%s], data len: %ld: ", 
0320             __func__, notif->my_id, notif->data_len);
0321     
0322     if (NULL==G_atmi_tls->p_unsol_handler)
0323     {
0324         NDRX_LOG(log_warn, "Unsol handler not set - dropping message");
0325         goto out;
0326     }
0327     
0328     if (G_atmi_tls->client_init_data.flags & TPU_IGN)
0329     {
0330         NDRX_LOG(log_warn, "TPU_IGN have been set - dropping message");
0331         goto out;
0332     }
0333     
0334     /* Convert only if we have data */
0335     NDRX_LOG(log_debug, "%s: data received", __func__);
0336     
0337     if (EXSUCCEED==(ndrx_mbuf_prepare_incoming(notif->data,
0338                     notif->data_len,
0339                     &odata,
0340                     &olen,
0341                     0L, 0L)))
0342     {
0343         typed_buf = ndrx_find_buffer(odata);
0344     }
0345     else
0346     {
0347         NDRX_LOG(log_error, "Failed to prepare incoming unsolicited notification");
0348         EXFAIL_OUT(ret);
0349     }
0350     
0351     NDRX_LOG(log_debug, "Unsol handler set to %p - invoking (buffer: %p)", 
0352             G_atmi_tls->p_unsol_handler, odata);
0353     
0354     /* Hmm we shall mark the buffer somehow, that this is ours, so that
0355      * we can later free. As the user can realloc it...
0356      */
0357     G_atmi_tls->p_unsol_handler(odata, olen, 0);
0358 
0359 out:    
0360 
0361     if (NULL!=typed_buf)
0362     {
0363         NDRX_LOG(log_debug, "About to free buffer %p", typed_buf->buf);
0364         tpfree(typed_buf->buf);
0365     }
0366 
0367     return;
0368 }
0369 
0370 /**
0371  * So we get the messages, just check first, as it must be with higher
0372  * priority than zero, it should arrive first. If we get any other type messages
0373  * Just push them to in memory queue which will be later processed by _tpgetrply()
0374  * @param flags additional flags for Q checking
0375  * @return SUCCEED/FAIL
0376  */
0377 expublic int ndrx_tpchkunsol(long flags) 
0378 {
0379     int ret = EXSUCCEED;
0380     char *pbuf = NULL;
0381     size_t pbuf_len;
0382     ssize_t rply_len;
0383     int num_applied = 0;
0384     unsigned prio;
0385     tp_notif_call_t *notif;
0386     
0387     /* Allocate the buffer... to put data into */
0388     NDRX_LOG(log_debug, "Into %s", __func__);
0389     do
0390     {
0391         if (NULL==pbuf)
0392         {
0393             NDRX_SYSBUF_MALLOC_OUT(pbuf, pbuf_len, ret);
0394         }
0395 
0396         /* keep the original settings at re-attempts */
0397         
0398         rply_len = ndrx_generic_q_receive(G_atmi_tls->G_atmi_conf.reply_q, 
0399                 G_atmi_tls->G_atmi_conf.reply_q_str,
0400                 &(G_atmi_tls->G_atmi_conf.reply_q_attr),
0401                 pbuf, pbuf_len, &prio, flags);
0402         
0403         NDRX_LOG(log_debug, "%s: %zd", __func__, (long)rply_len);
0404         
0405         if (rply_len<=0)
0406         {
0407             NDRX_LOG(log_warn, "%s: No message (%zd)", __func__,  rply_len);
0408             goto out;
0409         }
0410 
0411         notif=(tp_notif_call_t *) pbuf;
0412 
0413         /* could use %zu,  but we must be max cross platform... */
0414         NDRX_LOG(log_info, "%s: got message, len: %zd, command id: %d", 
0415                 __func__, rply_len, notif->command_id);
0416 
0417         if (ATMI_COMMAND_TPNOTIFY == notif->command_id ||
0418                 ATMI_COMMAND_BROADCAST == notif->command_id)
0419         {
0420             num_applied++;
0421             NDRX_LOG(log_info, "Got unsol command");
0422             ndrx_process_notif(pbuf, rply_len);
0423         /* are we sure we want to have free here of sysbuf? 
0424             NDRX_FREE(pbuf);
0425             pbuf = NULL;
0426          */
0427         }
0428         else
0429         {
0430             NDRX_LOG(log_info, "got non unsol command - enqueue");
0431             
0432             if (EXSUCCEED!=ndrx_add_to_memq(&pbuf, pbuf_len, rply_len))
0433             {
0434                 EXFAIL_OUT(ret);
0435             }
0436             
0437         }
0438         
0439         /* If not blocking, then on first applied msg we stop.
0440          * In the middle we can get memq messages for which we-reloop to next.
0441          */
0442         if (num_applied && ! (flags & TPNOBLOCK) )
0443         {
0444             break;
0445         }
0446         
0447     } while (1);
0448 out:
0449 
0450     if (NULL!=pbuf)
0451     {
0452         NDRX_SYSBUF_FREE(pbuf);
0453     }
0454 
0455     NDRX_LOG(log_debug, "%s returns %d (applied msgs: %d)", __func__, 
0456             ret, num_applied);
0457 
0458     if (EXSUCCEED==ret)
0459     {
0460         return num_applied;
0461     }
0462     else
0463     {
0464         return ret;
0465     }
0466 }
0467 
0468 /**
0469  * Match the given node against the dispatch arguments
0470  * @param nodeid_str nodeid to test (string)
0471  * @param nodeid nodeid/lmid in tpbroadcast
0472  * @param regexp_nodeid compiled regexp (if TPREGEXMATCH used)
0473  * @param flags flags passed to tpbrodcast
0474  * @return TRUE node accepted, FALSE not accepted
0475  */
0476 exprivate int match_nodeid(char *nodeid_str,  char *nodeid, 
0477         regex_t *regexp_nodeid, long flags)
0478 {
0479     int ret = EXFALSE;
0480     
0481     if (NULL!=nodeid)
0482     {
0483         if (EXEOS==nodeid[0])
0484         {
0485             NDRX_LOG(log_info, "Nodeid %s (nodeid=EOS)", nodeid_str);
0486             ret = EXTRUE;
0487         }
0488         else if ((flags & TPREGEXMATCH ) && 
0489                 EXSUCCEED==ndrx_regexec(regexp_nodeid, nodeid_str))
0490         {
0491             NDRX_LOG(log_info, "Nodeid %s matched local node by regexp",
0492                     nodeid_str);
0493             ret = EXTRUE;
0494         }
0495         else if (0==strcmp(nodeid, nodeid_str))
0496         {
0497             NDRX_LOG(log_info, "Nodeid %s matched by nodeid str param",
0498                     nodeid_str);
0499             ret = EXTRUE;
0500         }
0501         else
0502         {
0503             NDRX_LOG(log_info, "Nodeid %s did not match nodeid param [%s] => "
0504                     "skip node for broadcast",
0505                     nodeid_str, nodeid);
0506         }
0507     }
0508     else
0509     {
0510         NDRX_LOG(log_info, "nodeid param NULL, local node %s matched for broadcast",
0511                     nodeid_str);
0512             ret = EXTRUE;
0513     }
0514     
0515     return ret;
0516 }
0517 /**
0518  * Local broadcast, sends the message to 
0519  * @param nodeid NULL, empty string or regexp of cluster nodes. not used on local
0520  * @param usrname NULL, empty string or regexp of username. Not used.
0521  * @param cltname NULL, empty string or regexp of client name. Used.
0522  * @param data Data to broadcast to process...
0523  * @param len Data len
0524  * @param flags here TPREGEXMATCH flag affects node/usr/clt
0525  * @return SUCCEED/FAIL
0526  */
0527 expublic int ndrx_tpbroadcast_local(char *nodeid, char *usrname, char *cltname, 
0528         char *data,  long len, long flags, int dispatch_local)
0529 {
0530     int ret = EXSUCCEED;
0531     string_list_t* qlist = NULL;
0532     string_list_t* elt = NULL;
0533     int typ;
0534     TPMYID myid;
0535     ndrx_qdet_t qdet;
0536     CLIENTID cltid;
0537     regex_t regexp_nodeid;
0538     int     regexp_nodeid_comp = EXFALSE;
0539     
0540     regex_t regexp_usrname;
0541     int     regexp_usrname_comp = EXFALSE;
0542     
0543     regex_t regexp_cltname;
0544     int     regexp_cltname_comp = EXFALSE;
0545     char nodeid_str[16];
0546     
0547     int local_node_ok = EXFALSE;
0548     int cltname_ok;
0549     
0550     long local_nodeid = tpgetnodeid();
0551     
0552     char connected_nodes[CONF_NDRX_NODEID_COUNT+1] = {EXEOS};
0553     
0554     /* if the username is  */
0555     if (flags & TPREGEXMATCH)
0556     {
0557         /* Setup regexp matches (if any) */
0558         
0559         if (NULL!=nodeid && EXEOS!=nodeid[0])
0560         {
0561             if (EXSUCCEED!=ndrx_regcomp(&regexp_nodeid, nodeid))
0562             {
0563                 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile nodeid=[%s] regexp",
0564                         __func__, nodeid);
0565                 NDRX_LOG(log_error, "Failed to compile nodeid=[%s]", nodeid);
0566                 EXFAIL_OUT(ret);
0567             }
0568             else
0569             {
0570                 regexp_nodeid_comp = EXTRUE;
0571             }
0572         }
0573         
0574         if (NULL!=usrname && EXEOS!=usrname[0])
0575         {
0576             if (EXSUCCEED!=ndrx_regcomp(&regexp_usrname, usrname))
0577             {
0578                 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile usrname=[%s] regexp",
0579                         __func__, nodeid);
0580                 NDRX_LOG(log_error, "Failed to compile usrname=[%s]", usrname);
0581                 EXFAIL_OUT(ret);
0582             }
0583             else
0584             {
0585                 regexp_usrname_comp = EXTRUE;
0586             }
0587         }
0588         
0589         if (NULL!=cltname && EXEOS!=cltname[0])
0590         {
0591             if (EXSUCCEED!=ndrx_regcomp(&regexp_cltname, cltname))
0592             {
0593                 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile cltname=[%s] regexp",
0594                         __func__, cltname);
0595                 NDRX_LOG(log_error, "Failed to compile cltname=[%s]", cltname);
0596                 EXFAIL_OUT(ret);
0597             }
0598             else
0599             {
0600                 regexp_cltname_comp = EXTRUE;
0601             }
0602         }
0603     }
0604     
0605     /* So list all client queues locally
0606      * Match them
0607      * Build client ID
0608      * and run _tpnotify
0609      */
0610     
0611     /* Check our node here */
0612     snprintf(nodeid_str, sizeof(nodeid_str), "%ld", local_nodeid);
0613     
0614     local_node_ok=match_nodeid(nodeid_str,  nodeid,  &regexp_nodeid, flags);
0615     
0616     if (local_node_ok)
0617     {
0618         /* list all queues */
0619         qlist = ndrx_sys_mqueue_list_make(G_atmi_env.qpath, &ret);
0620 
0621         if (EXSUCCEED!=ret)
0622         {
0623             NDRX_LOG(log_error, "posix queue listing failed... continue...!");
0624             ret = EXSUCCEED;
0625             qlist = NULL;
0626         }
0627 
0628         LL_FOREACH(qlist,elt)
0629         {
0630             /* if not print all, then skip this queue */
0631             if (0!=strncmp(elt->qname, 
0632                     G_atmi_env.qprefix_match, G_atmi_env.qprefix_match_len))
0633             {
0634                 continue;
0635             }
0636             
0637             /* currently we will match cltname only and will work on
0638              * server & client reply qs 
0639              * because server can have reply q too... as we know.
0640              */
0641 
0642             typ = ndrx_q_type_get(elt->qname);
0643 
0644             if (NDRX_QTYPE_CLTRPLY==typ)
0645             {
0646                 /* This is our client, lets broadcast to it... 
0647                  * Build client id..
0648                  */
0649                 NDRX_LOG(log_debug, "Got client Q: [%s] - extract CLIENTID",
0650                         elt->qname);
0651 
0652                 /* parse q details */
0653                 if (EXSUCCEED!=ndrx_qdet_parse_cltqstr(&qdet, elt->qname))
0654                 {
0655                     NDRX_LOG(log_error, "Failed to parse Q details!");
0656                     EXFAIL_OUT(ret);
0657                 }
0658 
0659                 /* Check client name */
0660                 cltname_ok = EXFALSE;
0661                 
0662                 if (NULL!=cltname)
0663                 {
0664                     if (EXEOS==cltname[0])
0665                     {
0666                         NDRX_LOG(log_info, "Process matched broadcast [%s] "
0667                                 "(cltname=EOS)", 
0668                                 elt->qname);
0669                         cltname_ok = EXTRUE;
0670                     }
0671                     else if ((flags & TPREGEXMATCH )
0672                         && EXSUCCEED==ndrx_regexec(&regexp_cltname, qdet.binary_name))
0673                     {
0674                         NDRX_LOG(log_info, "Process [%s]/[%s] matched broadcast "
0675                                 "by regexp",
0676                                 elt->qname, qdet.binary_name);
0677                         cltname_ok = EXTRUE;
0678                     }
0679                     else if (0==strcmp(cltname, qdet.binary_name))
0680                     {
0681                         NDRX_LOG(log_info, "Process [%s]/[%s] matched by "
0682                                 "cltname str param [%s]",
0683                                 elt->qname, qdet.binary_name, cltname);
0684                         cltname_ok = EXTRUE;
0685                     }
0686                     else
0687                     {
0688                         NDRX_LOG(log_info, "Process [%s]/[%s] did not match "
0689                                 "cltname param [%s] => "
0690                                 "skip process for broadcast",
0691                                 elt->qname, qdet.binary_name, cltname);
0692                     }
0693                 }
0694                 else
0695                 {
0696                     NDRX_LOG(log_info, "cltname param NULL, process [%s]/[%s] "
0697                             "matched for broadcast",
0698                                 elt->qname, qdet.binary_name);
0699                         cltname_ok = EXTRUE;
0700                 }
0701                 
0702                 if (cltname_ok)
0703                 {
0704                     /* Build myid */
0705                     if (EXSUCCEED!=ndrx_myid_convert_from_qdet(&myid, &qdet, local_nodeid))
0706                     {
0707                         NDRX_LOG(log_error, "Failed to build MYID from QDET!");
0708                         EXFAIL_OUT(ret);
0709                     }
0710 
0711                     /* Build my_id string */
0712                     ndrx_myid_to_my_id_str(&myid, cltid.clientdata);
0713 
0714                     NDRX_LOG(log_info, "Build client id string: [%s]",
0715                             cltid.clientdata);
0716 
0717                     /* keep the api tout values */
0718                     if (EXSUCCEED!=ndrx_tpnotify(&cltid, &myid, elt->qname,
0719                         data, len, flags,  0, nodeid, usrname, cltname, 0))
0720                     {
0721                         NDRX_LOG(log_debug, "Failed to notify [%s] with buffer len: %d", 
0722                                 cltid.clientdata, len);
0723                         userlog("Failed to notify [%s] with buffer len: %d", 
0724                                 cltid.clientdata, len);
0725                     }
0726                 }
0727             } /* if client reply q */
0728         } /* for each q */
0729     } /* local node ok */
0730     
0731     /* Process cluster nodes... */
0732     
0733     /* Check the nodes to broadcast to 
0734      * In future if we choose to route transaction via different nodes
0735      * then we need to broadcast the notification to all machines.
0736      */
0737     if (!dispatch_local)
0738     {
0739         NDRX_LOG(log_info, "Dispatching over any connected nodes");
0740         if (EXSUCCEED==ndrx_shm_birdge_getnodesconnected(connected_nodes))
0741         {
0742             int i;
0743             int len = strlen(connected_nodes);
0744             for (i=0; i<len; i++)
0745             {
0746                 /* Sending stuff to connected nodes (if any matched) */
0747                 snprintf(nodeid_str, sizeof(nodeid_str), "%d", 
0748                         (int)connected_nodes[i]);
0749 
0750                 if (match_nodeid(nodeid_str,  nodeid,  &regexp_nodeid, flags))
0751                 {
0752                     NDRX_LOG(log_debug, "Node id %d accepted for broadcast", 
0753                             (int)connected_nodes[i]);
0754 
0755                     if (EXSUCCEED!=ndrx_tpnotify(NULL, NULL, NULL,
0756                             data, len, flags, 
0757                             (long)connected_nodes[i], nodeid, usrname, cltname, 
0758                             (TPCALL_BRCALL | TPCALL_BROADCAST)))
0759                     {
0760                         NDRX_LOG(log_debug, "Failed to notify [%s] with buffer len: %d", 
0761                                 cltid.clientdata, len);
0762                         userlog("Failed to notify [%s] with buffer len: %d", 
0763                                 cltid.clientdata, len);
0764                     }
0765                 }
0766             } /* for each node */
0767         } /* get cluster connected nodes */
0768     } /* is local dispatch? */
0769     else
0770     {
0771         NDRX_LOG(log_info, "Skip the cluster, local dispatch only...");
0772     }
0773     
0774 out:
0775 
0776     ndrx_string_list_free(qlist);
0777 
0778     if (regexp_nodeid_comp)
0779     {
0780         ndrx_regfree(&regexp_nodeid);
0781     }
0782 
0783     if (regexp_usrname_comp)
0784     {
0785         ndrx_regfree(&regexp_usrname);
0786     }
0787 
0788     if (regexp_cltname_comp)
0789     {
0790         ndrx_regfree(&regexp_cltname);
0791     }
0792 
0793     NDRX_LOG(log_debug, "%s returns %d", __func__, ret);
0794     
0795     return ret;
0796 }
0797 
0798 /* vim: set ts=4 sw=4 et smartindent: */