Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief This file contains routines for bridge/cluster support.
0003  *   Basically if server tells about it self that it is bridge, we add it to
0004  *   Linked list & hash (just like we do for process mode server ids...!).
0005  *   After that by linked list we can send advertise to
0006  *
0007  * @file bridge.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <string.h>
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <memory.h>
0041 #include <libxml/xmlreader.h>
0042 #include <errno.h>
0043 
0044 #include <ndrstandard.h>
0045 #include <ndrxd.h>
0046 
0047 #include <ndebug.h>
0048 #include <nstdutil.h>
0049 #include <exhash.h>
0050 #include <bridge_int.h>
0051 #include <nstopwatch.h>
0052 #include <cmd_processor.h>
0053 #include <atmi_shm.h>
0054 /*---------------------------Externs------------------------------------*/
0055 /*---------------------------Macros-------------------------------------*/
0056 /*---------------------------Enums--------------------------------------*/
0057 /*---------------------------Typedefs-----------------------------------*/
0058 /*---------------------------Globals------------------------------------*/
0059 bridgedef_t *G_bridge_hash = NULL; /* Hash table of bridges */
0060 bridgedef_svcs_t *G_bridge_svc_hash = NULL; /* Our full list of local services! */
0061 
0062 bridgedef_svcs_t *G_bridge_svc_diff= NULL; /* Service diff to be sent to nodes, 
0063                                             * procssed before G_bridge_svc_hash, 
0064                                             * because we need the point from what 
0065                                             * to make a diff */
0066 exprivate int M_build_diff = EXFALSE;           /* Diff mode enabled        */
0067 /*---------------------------Statics------------------------------------*/
0068 /*---------------------------Prototypes---------------------------------*/
0069 
0070 
0071 /**
0072  * Remove bridge service hash & remove stuff from shared mem.
0073  * @param 
0074  */
0075 expublic void brd_remove_bridge_services(bridgedef_t *cur)
0076 {
0077     bridgedef_svcs_t *r = NULL;
0078     bridgedef_svcs_t *rtmp = NULL;
0079     
0080     EXHASH_ITER(hh, cur->theyr_services, r, rtmp)
0081     {
0082         EXHASH_DEL(cur->theyr_services, r);
0083         /* Remove stuff from shared mem. */
0084         brd_lock_and_update_shm(cur->nodeid, r->svc_nm, 0, BRIDGE_REFRESH_MODE_FULL);
0085         NDRX_FREE(r);
0086     }
0087     
0088 }
0089 
0090 /**
0091  * Remove bridge from hash.
0092  * @param nodeid
0093  * @return 
0094  */
0095 expublic int brd_del_bridge(int nodeid)
0096 {
0097     bridgedef_t* cur = brd_get_bridge(nodeid);
0098     
0099     if (NULL==cur)
0100     {
0101         NDRX_LOG(log_error, "Failed to remove bridge %d - "
0102                 "node not found", nodeid);
0103         return EXFAIL;
0104     }
0105     else
0106     {
0107         NDRX_LOG(log_debug, "Removing bridge %d", nodeid);
0108         brd_remove_bridge_services(cur);
0109         EXHASH_DEL(G_bridge_hash, cur);
0110         NDRX_FREE(cur);
0111     }
0112     
0113     return EXSUCCEED;
0114 }
0115 
0116 /**
0117  * Add bridge server....
0118  * @param srvinfo
0119  * @return 
0120  */
0121 expublic int brd_addupd_bridge(srv_status_t * srvinfo)
0122 {
0123     int ret=EXSUCCEED;
0124     char *fn = "brd_addupd_bridge";
0125     bridgedef_t *cur;
0126     int add=EXFALSE;
0127     if (NULL!=(cur=brd_get_bridge(srvinfo->srvinfo.nodeid)))
0128     {
0129         NDRX_LOG(log_warn, "Bridge %d already exists - updating", 
0130                  srvinfo->srvinfo.nodeid);
0131     }
0132     else
0133     {
0134         NDRX_LOG(log_warn, "Bridge %d does not exists - registering", 
0135                  srvinfo->srvinfo.nodeid);
0136         cur = NDRX_CALLOC(1, sizeof(bridgedef_t));
0137         if (NULL==cur)
0138         {
0139             
0140             NDRX_LOG(log_error, "Failed to allocate %d bytes: %s", 
0141                     sizeof(bridgedef_t), strerror(errno));
0142             EXFAIL_OUT(ret);
0143         }
0144         
0145         cur->nodeid = srvinfo->srvinfo.nodeid;
0146         add=EXTRUE;
0147     }
0148     
0149     cur->srvid = srvinfo->srvinfo.srvid;
0150     cur->flags = srvinfo->srvinfo.flags;
0151     
0152     if (add)
0153     {
0154         /* here nodeid is hash field: */
0155         EXHASH_ADD_INT(G_bridge_hash, nodeid, cur);
0156     }
0157     
0158 out:
0159 
0160     NDRX_LOG(log_error, "%s return %d", fn, ret);
0161 
0162     return ret;
0163 }
0164 
0165 /**
0166  * Get bridge node (find in hash)
0167  * @param nodeid - node id.
0168  * @return 
0169  */
0170 expublic bridgedef_t* brd_get_bridge(int nodeid)
0171 {
0172     bridgedef_t *ret = NULL;
0173     
0174     EXHASH_FIND_INT(G_bridge_hash, &nodeid, ret);
0175     
0176     return ret;
0177 }
0178 
0179 /**
0180  * Build full refresh message.
0181  * Buffer size should match the max ATMI msg size.
0182  * @param buf where to put the message.
0183  * @return 
0184  */
0185 expublic int brd_build_refresh_msg(bridgedef_svcs_t *svcs, 
0186                 bridge_refresh_t *ref, char mode)
0187 {
0188     bridgedef_svcs_t *r = NULL;
0189     bridgedef_svcs_t *rtmp = NULL;
0190     
0191     /* This is global.
0192      * Thing is that on full diffs other side might want to search for services
0193      * what they actually do not have!
0194      */
0195     ref->mode = mode;
0196     
0197     EXHASH_ITER(hh, svcs, r, rtmp)
0198     {
0199         ref->svcs[ref->count].count = r->count;
0200         NDRX_STRCPY_SAFE(ref->svcs[ref->count].svc_nm, r->svc_nm);
0201         ref->svcs[ref->count].mode = mode;
0202         
0203         NDRX_LOG(log_debug, "Built refresh line: count: %d svc_nm: [%s] mode: %c",
0204                 ref->svcs[ref->count].count,
0205                 ref->svcs[ref->count].svc_nm,
0206                 ref->svcs[ref->count].mode);
0207         ref->count++;
0208     }
0209     
0210     return EXSUCCEED;
0211 }
0212 
0213 /**
0214  * Send refresh to node
0215  * @param refresh
0216  * @return 
0217  */
0218 expublic int brd_send_update(int nodeid, bridgedef_t *cur, bridge_refresh_t *refresh)
0219 {
0220     int ret=EXSUCCEED;
0221     int send_size;
0222     bridgedef_t *br = NULL;
0223     pm_node_t *srv = NULL;
0224     char *fn = "brd_send_update";
0225     char *buf=NULL;
0226     size_t buf_len;
0227     bridge_refresh_t *tmp_refresh;
0228     /* default use all */
0229     bridge_refresh_t *p_refresh = refresh;
0230     
0231     NDRX_SYSBUF_MALLOC_OUT(buf, buf_len, ret);
0232     
0233     tmp_refresh= (bridge_refresh_t *)buf;
0234     
0235     if ( NULL!=cur )
0236     {
0237         br = cur;
0238     }
0239     else
0240     {
0241         br = brd_get_bridge(nodeid);
0242     }
0243     
0244     if (NULL==br)
0245     {
0246         NDRX_LOG(log_error, "Invalid nodeid %d!!!", nodeid);
0247         ret=EXFAIL;
0248         goto out;
0249     }
0250     
0251     srv = get_pm_from_srvid(br->srvid);
0252     
0253     if (NULL==srv)
0254     {
0255         NDRX_LOG(log_error, "Invalid server id %d!!!", br->srvid);
0256         ret=EXFAIL;
0257         goto out;
0258     }
0259     
0260     /* Check is per server filter enabled? */
0261     if (EXEOS!=srv->conf->exportsvcs[0] || EXEOS!=srv->conf->blacklistsvcs[0])
0262     {
0263         int i;
0264         NDRX_LOG(6, "filtering by exportsvcs or blacklistsvcs");
0265         
0266         memset(tmp_refresh, 0, sizeof(*tmp_refresh));
0267         /* Initialize the list */
0268         tmp_refresh->mode = refresh->mode;
0269         
0270         /* We should export only services in list. */
0271         
0272         for (i=0; i<refresh->count; i++)
0273         {
0274             char search_svc[MAXTIDENT+3];
0275             search_svc[0]=',';
0276             search_svc[1]=EXEOS;
0277             
0278             NDRX_STRCAT_S(search_svc, sizeof(search_svc), refresh->svcs[i].svc_nm);
0279             NDRX_STRCAT_S(search_svc, sizeof(search_svc), ",");
0280             
0281             /*
0282              * If blacklist set, then filter out blacklisted services
0283              */
0284             if (EXEOS!=srv->conf->blacklistsvcs[0] && strstr(srv->conf->blacklistsvcs, 
0285                     search_svc))
0286             {
0287                 NDRX_LOG(6, "svc %s blacklisted for export", refresh->svcs[i].svc_nm);
0288                 continue;
0289             }
0290             /* If export list empty - export all
0291              * If export list set, then only those in the list
0292              */
0293             else if (EXEOS==srv->conf->exportsvcs[0] || strstr(srv->conf->exportsvcs, 
0294                     search_svc))
0295             {
0296                 NDRX_LOG(6, "svc %s ok for export", refresh->svcs[i].svc_nm);
0297                 /* Copy the service data off */
0298                 memcpy(&tmp_refresh->svcs[tmp_refresh->count], &refresh->svcs[i], 
0299                         sizeof(refresh->svcs[0]));
0300                 tmp_refresh->count++;
0301             }
0302         }
0303         /* Swith pointers over */
0304         p_refresh = tmp_refresh;
0305     }
0306     
0307     send_size = sizeof(bridge_refresh_t);
0308     send_size+=p_refresh->count*sizeof(bridge_refresh_svc_t);
0309     
0310     /* Call the server */
0311     ret = cmd_generic_call(NDRXD_COM_BRREFERSH_RQ, NDRXD_SRC_ADMIN,
0312             NDRXD_CALL_TYPE_BRIDGESVCS,
0313             (command_call_t *)p_refresh, send_size,
0314             G_command_state.listenq_str,
0315             G_command_state.listenq,
0316             (mqd_t)EXFAIL,
0317             get_srv_admin_q(srv),
0318             0, NULL,
0319             NULL,
0320             NULL,
0321             NULL,
0322             EXFALSE);
0323     
0324 out:
0325 
0326             
0327     if (NULL!=buf)
0328     {
0329         NDRX_SYSBUF_FREE(buf);
0330     }
0331 
0332     NDRX_LOG(log_debug, "%s return %d", fn, ret);
0333     return ret;
0334 }
0335 
0336 /**
0337  * Bridge have been connected.
0338  * @param nodeid
0339  * @return 
0340  */
0341 expublic int brd_connected(int nodeid)
0342 {
0343     int ret=EXSUCCEED;
0344     bridgedef_t *cur = brd_get_bridge(nodeid);
0345     char *buf = NULL;
0346     size_t buf_len;
0347     bridge_refresh_t *refresh;
0348     
0349     NDRX_SYSBUF_MALLOC_OUT(buf, buf_len, ret);
0350     refresh= (bridge_refresh_t *)buf;
0351     memset(refresh, 0, sizeof(bridge_refresh_t));
0352     
0353     if (NULL!=cur)
0354     {
0355         pm_node_t * p_pm = get_pm_from_srvid(cur->srvid);
0356         cur->connected = EXTRUE;
0357         p_pm->flags|=SRV_KEY_FLAGS_CONNECTED;
0358         
0359         if (cur->flags & SRV_KEY_FLAGS_SENDREFERSH)
0360         {
0361             NDRX_LOG(log_debug, "About to send to node %d full refresh", nodeid);
0362             if (EXSUCCEED==brd_build_refresh_msg(G_bridge_svc_hash, refresh, 
0363                     BRIDGE_REFRESH_MODE_FULL))
0364             {
0365                 if (EXSUCCEED==(ret = brd_send_update(nodeid, cur, refresh)))
0366                 {
0367                     /* Reset full refresh timer */
0368                     cur->lastrefresh_sent = SANITY_CNT_START;
0369                 }
0370             }
0371         }
0372         else
0373         {
0374             NDRX_LOG(log_debug, "node %d does not require refresh", nodeid);
0375         }
0376     }
0377     else
0378     {
0379         NDRX_LOG(log_warn, "Unknown bridge nodeid %d!!!", nodeid);
0380     }
0381     
0382     
0383 out:
0384 
0385     if (NULL!=buf)
0386     {
0387         NDRX_SYSBUF_FREE(buf);
0388     }
0389 
0390     return ret;
0391 }
0392 
0393 /**
0394  * Bridge have been disconnected - remove any stuff from shared mem.
0395  * @param nodeid
0396  * @return 
0397  */
0398 expublic int brd_discconnected(int nodeid)
0399 {
0400     bridgedef_t *cur = brd_get_bridge(nodeid);
0401     bridgedef_svcs_t *r = NULL;
0402     bridgedef_svcs_t *rtmp = NULL;
0403         
0404     if (NULL!=cur)
0405     {   pm_node_t * p_pm = get_pm_from_srvid(cur->srvid);
0406         cur->connected = EXFALSE;
0407         /* Clear connected flag. */
0408         p_pm->flags&=(~SRV_KEY_FLAGS_CONNECTED);
0409         
0410         /* ###################### CRITICAL SECTION ###################### */
0411         /* So we make this part critical... */
0412         if (EXSUCCEED!=ndrx_lock_svc_op(__func__))
0413         {
0414             goto out;
0415         }
0416         
0417         EXHASH_ITER(hh, cur->theyr_services, r, rtmp)
0418         {
0419             /* Remove from shm */
0420             ndrx_shm_install_svc_br(r->svc_nm, 0, 
0421                         EXTRUE, nodeid, 0, BRIDGE_REFRESH_MODE_FULL, 0);
0422             
0423             /* Delete from hash */
0424             EXHASH_DEL(cur->theyr_services, r);
0425             /* Free up memory */
0426             NDRX_FREE(r);
0427         }
0428         
0429         /* Remove the lock! */
0430         ndrx_unlock_svc_op(__func__);
0431         /* ###################### CRITICAL SECTION, END ################# */
0432         
0433     }
0434     else
0435     {
0436         NDRX_LOG(log_warn, "Unknown bridge nodeid %d!!!", nodeid);
0437     }
0438     
0439 out:
0440     return EXSUCCEED;
0441 }
0442 
0443 /**
0444  * Our service to be removed from bridge hash..
0445  * TODO: In diff mode if item does not exists, it should add and run the stuff 
0446  * into minuses. So that we get the diff finally!!
0447  * 
0448  * @param svc
0449  * @return 
0450  */
0451 expublic int brd_del_svc_from_hash_g(bridgedef_svcs_t ** svcs, char *svc, int diff)
0452 {
0453     int ret=EXSUCCEED;
0454     bridgedef_svcs_t *r=NULL;
0455     
0456     EXHASH_FIND_STR( *svcs, svc, r);
0457     if (NULL!=r)
0458     {
0459         if (diff || r->count>1)
0460         {
0461             r->count--;
0462             NDRX_LOG(log_debug, "bridge view: svc [%s] count: %d", 
0463                     svc, r->count);
0464         }
0465         else
0466         {
0467             NDRX_LOG(log_debug, "bridge view: svc [%s] removed", 
0468                     svc);
0469             EXHASH_DEL(*svcs, r);
0470             NDRX_FREE(r);
0471         }
0472     }
0473     else
0474     {
0475         if (diff)
0476         {
0477             NDRX_LOG(log_debug, "Service [%s] does not exists in "
0478                     "diff hash - adding", svc);
0479             r = NDRX_CALLOC(1, sizeof(*r));
0480             if (NULL==r)
0481             {
0482                 NDRX_LOG(log_error, "Failed to allocate %d bytes: %s", 
0483                         sizeof(*r), strerror(errno));
0484                 ret=EXFAIL;
0485                 goto out;
0486             }
0487             r->count=-1; /* should be 1 */
0488             NDRX_STRCPY_SAFE(r->svc_nm, svc);
0489             EXHASH_ADD_STR( *svcs, svc_nm, r );
0490         }
0491         else
0492         {
0493             NDRX_LOG(log_debug, "WARN: called del svc %s, but no "
0494                     "entry in hash!", svc);
0495         }
0496     }
0497 out:
0498     return ret;
0499 }
0500 
0501 /**
0502  * Delete service from global view
0503  * This also builds the diff
0504  * @param svc
0505  */
0506 expublic void brd_del_svc_from_hash(char *svc)
0507 {
0508     char *fn = "brd_del_svc_from_hash";
0509     /* Nothing to do if not clustered... 
0510     if (!G_atmi_env.is_clustered)
0511         return;
0512      * we need the list for pq stats..
0513      */
0514     
0515     /* We ignore special services like bridge.. */
0516     if (0==strncmp(svc, NDRX_SVC_BRIDGE, NDRX_SVC_BRIDGE_STATLEN) ||
0517             0==strcmp(svc, NDRX_SYS_SVC_PFX EV_TPEVSUBS) ||
0518             0==strcmp(svc, NDRX_SYS_SVC_PFX EV_TPEVUNSUBS) ||
0519             0==strcmp(svc, NDRX_SYS_SVC_PFX EV_TPEVPOST) ||
0520             0==strcmp(svc, NDRX_SYS_SVC_PFX EV_TPEVDOPOST) ||
0521             /* no reason to export on other node */
0522             0==strcmp(svc, NDRX_SVC_TMIB)
0523         )
0524     {
0525         NDRX_LOG(log_debug, "del: IGNORING %s", svc);
0526         return;
0527     }
0528     
0529     NDRX_LOG(log_debug, "%s - enter", fn);
0530     
0531     if (M_build_diff)
0532     {
0533         brd_del_svc_from_hash_g(&G_bridge_svc_diff, svc, EXTRUE);
0534     }
0535     
0536     brd_del_svc_from_hash_g(&G_bridge_svc_hash, svc, EXFALSE);
0537 }
0538 
0539 /**
0540  * Modify global view of our services...
0541  * This also builds the diffs
0542  * @param svc
0543  * @return 
0544  */
0545 expublic int brd_add_svc_to_hash(char *svc)
0546 {
0547     int ret=EXSUCCEED;
0548     
0549     /* We ignore special services like bridge.. 
0550      * but we will export DOPOST as it will be called by local event dispatcher
0551      */
0552     if (0==strncmp(svc, NDRX_SVC_BRIDGE, NDRX_SVC_BRIDGE_STATLEN) ||
0553             0==strncmp(svc, NDRX_SYS_SVC_PFX EV_TPEVSUBS, 
0554                                 strlen(NDRX_SYS_SVC_PFX EV_TPEVSUBS)) ||
0555             0==strncmp(svc, NDRX_SYS_SVC_PFX EV_TPEVUNSUBS, 
0556                                 strlen(NDRX_SYS_SVC_PFX EV_TPEVUNSUBS)) ||
0557             0==strncmp(svc, NDRX_SYS_SVC_PFX EV_TPEVPOST, 
0558                                 strlen(NDRX_SYS_SVC_PFX EV_TPEVPOST)) ||
0559             /* Also have to skip recovery service for bridges. */
0560             0==strcmp(svc, NDRX_SYS_SVC_PFX TPRECOVERSVC) ||
0561             
0562             0==strcmp(svc, NDRX_SVC_TMIB)
0563     )
0564     {
0565         NDRX_LOG(log_debug, "IGNORING %s", svc);
0566         return EXSUCCEED;
0567     }
0568     
0569     NDRX_LOG(log_debug, "ADDING %s", svc);
0570     
0571     if (EXSUCCEED!=brd_add_svc_to_hash_g(&G_bridge_svc_diff, svc))
0572     {
0573         EXFAIL_OUT(ret);
0574     }
0575     
0576     ret=brd_add_svc_to_hash_g(&G_bridge_svc_hash, svc);
0577     
0578 out:
0579     return EXSUCCEED;
0580 }
0581 
0582 /**
0583  * Add our service to hash list
0584  * @param svc
0585  * @param sh
0586  * @return 
0587  */
0588 expublic int brd_add_svc_to_hash_g(bridgedef_svcs_t ** svcs, char *svc)
0589 {
0590     int ret=EXSUCCEED;
0591     bridgedef_svcs_t *r=NULL;
0592     
0593     /* Try to get, if stuff in hash, increase counter */
0594     EXHASH_FIND_STR( *svcs, svc, r);
0595     if (NULL!=r)
0596     {
0597         r->count++;
0598     }
0599     /* If not in hash, then add new struct with count 1 */
0600     else
0601     {
0602         r = NDRX_CALLOC(1, sizeof(*r));
0603         if (NULL==r)
0604         {
0605             NDRX_LOG(log_error, "Failed to allocate %d bytes: %s", 
0606                     sizeof(*r), strerror(errno));
0607             ret=EXFAIL;
0608             goto out;
0609         }
0610         r->count++; /* should be 1 */
0611         NDRX_STRCPY_SAFE(r->svc_nm, svc);
0612         EXHASH_ADD_STR( *svcs, svc_nm, r );
0613     }
0614     
0615     NDRX_LOG(log_error, "Number of services of [%s] = %d", svc, r->count);
0616     
0617 out:
0618     return ret;
0619 }
0620 
0621 /**
0622  * Erase service hash list...
0623  */
0624 expublic void brd_erase_svc_hash_g(bridgedef_svcs_t *svcs)
0625 {
0626     bridgedef_svcs_t *cur, *tmp;
0627     
0628     EXHASH_ITER(hh, svcs, cur, tmp)
0629     {
0630         EXHASH_DEL(svcs,cur);
0631         NDRX_FREE(cur);
0632     }
0633 }
0634 
0635 /**
0636  * Add svc to bridge svc hash 
0637  */
0638 expublic int brd_add_svc_brhash(bridgedef_t *cur, char *svc, int count)
0639 {
0640     int ret=EXSUCCEED;
0641     bridgedef_svcs_t *r=NULL;
0642     bridgedef_svcs_t *tmp=NULL;
0643     
0644     /* Try to get, if stuff in hash, increase counter */
0645     EXHASH_FIND_STR( cur->theyr_services, svc, r);
0646     if (NULL!=r)
0647     {
0648         NDRX_LOG(log_error, "Service [%s] already exists in bridge's %d hash!",
0649                                 svc, cur->nodeid);
0650         r->count = count;
0651     }
0652     else
0653     {
0654         r = NDRX_CALLOC(1, sizeof(*r));
0655         if (NULL==r)
0656         {
0657             NDRX_LOG(log_error, "Failed to allocate %d bytes: %s", 
0658                     sizeof(*r), strerror(errno));
0659             ret=EXFAIL;
0660             goto out;
0661         }
0662         NDRX_STRCPY_SAFE(r->svc_nm, svc);
0663         r->count = count;
0664         EXHASH_ADD_STR( cur->theyr_services, svc_nm, r );
0665     }
0666     
0667 out:
0668     return ret;
0669 }
0670 
0671 /**
0672  * Remove svc from bridge svc hash
0673  * @param cur - might be a null, then lookup will be made by svc.
0674  * @param s
0675  * @param svc
0676  * @param count
0677  * @return 
0678  */
0679 expublic void brd_del_svc_brhash(bridgedef_t *cur, bridgedef_svcs_t *s, char *svc)
0680 {
0681     bridgedef_svcs_t *r=NULL;
0682     
0683     if (NULL!=cur)
0684         r = s;
0685     else
0686         EXHASH_FIND_STR( cur->theyr_services, svc, r);
0687     
0688     if (NULL!=r)
0689     {
0690         EXHASH_DEL(cur->theyr_services, r);
0691         NDRX_FREE(r);
0692     }
0693     else
0694     {
0695         NDRX_LOG(log_debug, "WARN: called del svc %s, but no "
0696                 "entry in bridge %d hash!", svc, cur->nodeid);
0697     }
0698 }
0699 
0700 /**
0701  * Get service entry of the bridge hash
0702  * @param cur
0703  * @param svc
0704  * @return 
0705  */
0706 expublic bridgedef_svcs_t * brd_get_svc_brhash(bridgedef_t *cur, char *svc)
0707 {
0708     bridgedef_svcs_t *r=NULL;
0709     EXHASH_FIND_STR( cur->theyr_services, svc, r);
0710     
0711     return r;
0712 }
0713 
0714 
0715 /**
0716  * Get service from service hash list.
0717  * @param cur
0718  * @param svc
0719  * @return 
0720  */
0721 expublic bridgedef_svcs_t * brd_get_svc(bridgedef_svcs_t * svcs, char *svc)
0722 {
0723     bridgedef_svcs_t *r=NULL;
0724     EXHASH_FIND_STR( svcs, svc, r);
0725     return r;
0726 }
0727 
0728 
0729 
0730 /**
0731  * remove service diff
0732  */
0733 exprivate void brd_clear_diff(void)
0734 {
0735     bridgedef_svcs_t *cur, *tmp;
0736     
0737     EXHASH_ITER(hh, G_bridge_svc_diff, cur, tmp)
0738     {
0739         EXHASH_DEL(G_bridge_svc_diff,cur);
0740         NDRX_FREE(cur);
0741     }
0742     
0743 }
0744 
0745 /**
0746  * Begin diff build
0747  */
0748 expublic void brd_begin_diff(void)
0749 {
0750     if (!ndrx_get_G_atmi_env()->is_clustered)
0751         return;
0752     
0753     M_build_diff = EXTRUE;
0754     /* clear hashlist */
0755     brd_clear_diff();
0756 }
0757     
0758 /**
0759  * dispatch the diff to all nodes.
0760  */
0761 expublic void brd_end_diff(void)
0762 {
0763     int ret = EXSUCCEED;
0764     bridgedef_t *r = NULL;
0765     bridgedef_t *rtmp = NULL;
0766     char *buf = NULL;
0767     size_t buf_len;
0768     bridge_refresh_t *refresh;
0769     int first = EXTRUE;
0770     
0771     /* Nothing to do. */
0772     if (!ndrx_get_G_atmi_env()->is_clustered)
0773         return;
0774     
0775     NDRX_SYSBUF_MALLOC_OUT(buf, buf_len, ret);
0776     refresh= (bridge_refresh_t *)buf;
0777     
0778     M_build_diff = EXFALSE;
0779     
0780     /* TODO: Send stuff to everybody */
0781     EXHASH_ITER(hh, G_bridge_hash, r, rtmp)
0782     {
0783         NDRX_LOG(log_debug, "Processing: nodeid %d, is connected=%s, flags=%d",
0784                                 r->nodeid, r->connected?"Yes":"No", r->flags);
0785         
0786         if (r->connected && SRV_KEY_FLAGS_SENDREFERSH&r->flags)
0787         {
0788             NDRX_LOG(log_debug, "Sending refresh...");
0789             /* First time, build the diff message */
0790             if (first)
0791             {
0792                 memset(buf, 0, sizeof(bridge_refresh_t));
0793 
0794                 brd_build_refresh_msg(G_bridge_svc_diff, refresh, 
0795                         BRIDGE_REFRESH_MODE_DIFF);
0796                 first=EXFALSE;
0797                 
0798                 if (0==refresh->count)
0799                 {
0800                     NDRX_LOG(log_debug, "Zero count of service "
0801                             "diff - nothing to do;");
0802                     break;
0803                 }
0804             }
0805             
0806             if (EXSUCCEED!=brd_send_update(r->nodeid, r, refresh))
0807             {
0808                 NDRX_LOG(log_warn, "Failed to send update to node %d", 
0809                         r->nodeid);
0810             }
0811         }
0812         else
0813         {
0814             NDRX_LOG(log_debug, "Not Sending refresh - not connected "
0815                     "or refresh not required");
0816         }
0817         
0818     }
0819     
0820     /* Clear hashlist */
0821     brd_clear_diff();
0822     
0823 out:
0824     if (NULL!=buf)
0825     {
0826         NDRX_SYSBUF_FREE(buf);
0827     }
0828 }
0829 
0830 /**
0831  * Periodic houseekping function
0832  */
0833 expublic void brd_send_periodrefresh(void)
0834 {
0835     int ret = EXSUCCEED;
0836     bridgedef_t *cur = NULL;
0837     bridgedef_t *rtmp = NULL;
0838     char *buf=NULL;
0839     size_t buf_len;
0840     bridge_refresh_t *refresh;
0841     
0842     /* Nothing to do. */
0843     if (!ndrx_get_G_atmi_env()->is_clustered || 0==G_app_config->brrefresh)
0844         return;
0845     
0846     NDRX_SYSBUF_MALLOC_OUT(buf, buf_len, ret);
0847     refresh= (bridge_refresh_t *)buf;
0848     /* Reset the buffer otherwise it keeps growing!!! */
0849     memset(refresh, 0, sizeof(bridge_refresh_t));
0850     
0851     EXHASH_ITER(hh, G_bridge_hash, cur, rtmp)
0852     {
0853         /* Using sanity as step interval */
0854         cur->lastrefresh_sent++;
0855         
0856         if (cur->connected && cur->flags & SRV_KEY_FLAGS_SENDREFERSH
0857                 && cur->lastrefresh_sent >= G_app_config->brrefresh)
0858         {
0859             NDRX_LOG(log_debug, "About to send to node %d full refresh", 
0860                     cur->nodeid);
0861             
0862             if (EXSUCCEED==brd_build_refresh_msg(G_bridge_svc_hash, refresh, 
0863                     BRIDGE_REFRESH_MODE_FULL))
0864             {
0865                 if (EXSUCCEED == brd_send_update(cur->nodeid, cur, refresh))
0866                 {
0867                     /* Reset full refresh timer */
0868                     cur->lastrefresh_sent = SANITY_CNT_START;
0869                 }
0870             }
0871         } /* If connected */
0872     } /* EXHASH_ITER */
0873     
0874 out:
0875                                     
0876     if (NULL!=buf)
0877     {
0878         NDRX_SYSBUF_FREE(buf);
0879     }
0880 }
0881 /* vim: set ts=4 sw=4 et smartindent: */