Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief ATMI level cache - event processing (distributed cache)
0003  *
0004  * @file atmi_cache_events.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 
0035 /*---------------------------Includes-----------------------------------*/
0036 #include <stdlib.h>
0037 #include <stdio.h>
0038 #include <errno.h>
0039 #include <string.h>
0040 #include <ndrstandard.h>
0041 #include <atmi.h>
0042 #include <atmi_tls.h>
0043 #include <typed_buf.h>
0044 
0045 #include "thlock.h"
0046 #include "userlog.h"
0047 #include "utlist.h"
0048 #include "exregex.h"
0049 #include <exparson.h>
0050 #include <atmi_cache.h>
0051 #include <Exfields.h>
0052 #include <ubfutil.h>
0053 /*---------------------------Externs------------------------------------*/
0054 /*---------------------------Macros-------------------------------------*/
0055 /*---------------------------Enums--------------------------------------*/
0056 /*---------------------------Typedefs-----------------------------------*/
0057 /*---------------------------Globals------------------------------------*/
0058 /*---------------------------Statics------------------------------------*/
0059 /*---------------------------Prototypes---------------------------------*/
0060 
0061 /**
0062  * Broadcast buffer as event
0063  * @param cache cache on which we perform broadcast
0064  * @param svc service name of cache event occurring or cachedb name (depending on event)
0065  * @param idata input data
0066  * @param ilen input data len
0067  * @param event_type event type, see
0068  * @param flags flags to put in event NDRX_CACHE_BCAST_MODE_*
0069  * @param user1 user data field 1 (microseconds)
0070  * @param user2 user data field 2 (epoch seconds)
0071  * @param user3 user data field 1 (tperrno)
0072  * @param user4 user data field 2 (tpurcode)
0073  * @return EXSUCCEED/EXFAIL, tperror installed if any
0074  */
0075 expublic int ndrx_cache_broadcast(ndrx_tpcallcache_t *cache, 
0076         char *svc, char *idata, long ilen, int event_type, char *flags,
0077         int user1, long user2, int user3, long user4)
0078 {
0079     int ret = EXSUCCEED;
0080     char *fmt;
0081     char event[XATMI_EVENT_MAX+1];
0082     char *odata = NULL;
0083     long olen;
0084     
0085     if (NDRX_CACHE_BCAST_MODE_PUT==event_type)
0086     {
0087         fmt = NDRX_CACHE_EV_PUT;
0088         
0089         odata = idata;
0090         olen = ilen;
0091     }
0092     else if (NDRX_CACHE_BCAST_MODE_DEL==event_type)
0093     {
0094         fmt = NDRX_CACHE_EV_DEL;
0095         
0096         /* prepare projection on what we want to broadcast */
0097         
0098         if (ndrx_G_tpcache_types[cache->buf_type->type_id].pf_cache_del)
0099         {
0100             if (EXSUCCEED!=ndrx_G_tpcache_types[cache->buf_type->type_id].pf_cache_del(
0101                     cache, idata, ilen, &odata, &olen))
0102             {
0103                 NDRX_LOG(log_error, "Failed to prepare broadcast data for delete");
0104                 EXFAIL_OUT(ret);
0105             }
0106         }
0107         else
0108         {
0109             odata = idata;
0110             olen = ilen;
0111         }
0112         
0113     }
0114     else if (NDRX_CACHE_BCAST_MODE_KIL==event_type)
0115     {
0116         fmt = NDRX_CACHE_EV_KILL;
0117         
0118         odata = idata;
0119         olen = ilen;
0120     }
0121     else if (NDRX_CACHE_BCAST_MODE_MSK==event_type)
0122     {
0123         fmt = NDRX_CACHE_EV_MSKDEL;
0124         odata = idata;
0125         olen = ilen;
0126     }
0127     else if (NDRX_CACHE_BCAST_MODE_DKY==event_type)
0128     {
0129         fmt = NDRX_CACHE_EV_KEYDEL;
0130         odata = idata;
0131         olen = ilen;
0132     }       
0133     else
0134     {
0135         NDRX_CACHE_TPERROR(TPESYSTEM, "Invalid broadcast event type: %d", 
0136                 event_type);
0137         EXFAIL_OUT(ret);
0138     }
0139     
0140     /* Build the event to broadcast */
0141     
0142     snprintf(event, sizeof(event), fmt, (int)tpgetnodeid(), flags, svc);
0143     
0144     NDRX_LOG(log_debug, "Broadcasting event: [%s] user1=%d user2=%ld "
0145             "user3=%d user4=%ld", event, user1, user2, user3, user4);
0146     
0147     if (EXFAIL==ndrx_tppost(event, odata, olen, TPNOTRAN|TPNOREPLY, user1, user2,
0148             user3, user4))
0149     {
0150         NDRX_CACHE_ERROR("Failed to send event [%s]: %s", 
0151                 event, tpstrerror(tperrno));
0152         
0153         /* ignore status, unset error */
0154         
0155         ndrx_TPunset_error();
0156         
0157         EXFAIL_OUT(ret);
0158     }
0159     
0160 out:
0161 
0162     if (odata!=NULL && odata!=idata)
0163     {
0164         tpfree(odata);
0165     }
0166 
0167     return ret;
0168 }
0169 
0170 /**
0171  * Return list of event to which subscribe in current CCTAG. Note that we have
0172  * only service instance of cache events. thus we must see all cctags of caches
0173  * Multiple executables may run
0174  * @param list string list of events to subscribe to
0175  * @return EXFAIL/EXSUCCEED (in case of succeed string list free required if not
0176  * NULL)
0177  */
0178 expublic int ndrx_cache_events_get(string_list_t **list)
0179 {
0180     int ret = EXSUCCEED;
0181     ndrx_tpcache_db_t *el, *elt;
0182     
0183     /* loop over all databases and get subscribe events */
0184     
0185     EXHASH_ITER(hh, ndrx_G_tpcache_db, el, elt)
0186     {
0187         if (EXEOS!=el->subscr[0])
0188         {
0189             if (EXSUCCEED!=ndrx_string_list_add(list, el->subscr))
0190             {
0191                 NDRX_LOG(log_error, "failed to add string to list [%s]", 
0192                         el->subscr);
0193                 EXFAIL_OUT(ret);
0194             }
0195         }
0196     }
0197     
0198 out:
0199 
0200     if (EXSUCCEED!=ret && NULL!=*list)
0201     {
0202         ndrx_string_list_free(*list);
0203         *list = NULL;
0204     }
0205 
0206     return ret;
0207 }
0208 /* vim: set ts=4 sw=4 et smartindent: */