Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Cache event receiver, but what we do not want is to subscribe on local events
0003  *   if subscribed by mask, but if it is local node, then just ignore
0004  *   Also we advertise service name with node id in it. So that remote node in future
0005  *   may call service directly.
0006  *
0007  * @file tpcachesv.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043 
0044 #include <ndebug.h>
0045 #include <atmi.h>
0046 #include <sys_unix.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <atmi_shm.h>
0053 #include <exregex.h>
0054 #include "tpcachesv.h"
0055 #include <atmi_cache.h>
0056 #include <ubfutil.h>
0057 /*---------------------------Externs------------------------------------*/
0058 /*---------------------------Macros-------------------------------------*/
0059 /*---------------------------Enums--------------------------------------*/
0060 /*---------------------------Typedefs-----------------------------------*/
0061 /*---------------------------Globals------------------------------------*/
0062 /*---------------------------Statics------------------------------------*/
0063 /*---------------------------Prototypes---------------------------------*/
0064 
0065 /**
0066  * Process incoming events - put and delete
0067  * @param p_svc
0068  */
0069 void CACHEEV (TPSVCINFO *p_svc)
0070 {
0071     int ret=EXSUCCEED;
0072     tp_command_call_t * last_call;
0073     char *extradata;
0074     char *extradata_p;
0075     char nodeidstr[3+1];
0076     char *op;
0077     int nodeid;
0078     char *flags;
0079     char *svcnm;
0080     int len;
0081     char type[XATMI_SUBTYPE_LEN+1];
0082     
0083     /* dump the buffer, if it is UBF... */
0084     
0085     if (NULL!=p_svc->data)
0086     {
0087         if (EXFAIL==tptypes(p_svc->data, type, NULL))
0088         {
0089             NDRX_LOG(log_error, "Faileld to get incoming buffer type: %s",
0090                     tpstrerror(tperrno));
0091             EXFAIL_OUT(ret);
0092         }
0093         
0094         if (0==strcmp(type, "UBF"))
0095         {
0096             ndrx_debug_dump_UBF(log_debug, "Received UBF:", (UBFH *)p_svc->data);
0097         }
0098     }
0099     
0100     /* now understand what request this was 
0101      * also we need to get timestamps
0102      */
0103     last_call=ndrx_get_G_last_call();
0104     
0105     /* NDRX_STRCPY_SAFE(extradata, last_call->extradata); */
0106     
0107     extradata = extradata_p = NDRX_STRDUP(last_call->extradata);
0108     
0109     if (NULL==extradata)
0110     {
0111         NDRX_LOG(log_error, "strdup failed: %s", tpstrerror(tperrno));
0112         EXFAIL_OUT(ret);
0113     }
0114     
0115     NDRX_LOG(log_info, "Received event op: [%s]", extradata);
0116     
0117     if (NULL==(op = ndrx_strsep(&extradata, "/")))
0118     {
0119         NDRX_LOG(log_error, "Invalid event [%s] received - failed to get 'operation'", 
0120                 last_call->extradata);
0121         EXFAIL_OUT(ret);
0122     }
0123     
0124     len = strlen(op);
0125     
0126     if (len != NDRX_CACHE_EV_PFXLEN)
0127     {
0128         NDRX_LOG(log_error, "Invalid event prefix, expected len: %d, got: %d",
0129                 NDRX_CACHE_EV_PFXLEN, len);
0130         EXFAIL_OUT(ret);
0131     }
0132     
0133     nodeidstr[0] = op[3];
0134     nodeidstr[1] = op[3+1];
0135     nodeidstr[2] = op[3+2];
0136     nodeidstr[3] = EXEOS;
0137     
0138     nodeid = atoi(nodeidstr);
0139     
0140     if (nodeid<=0)
0141     {
0142         NDRX_LOG(log_error, "Invalid node id received [%d] must be > 0!", 
0143                 nodeid);
0144         EXFAIL_OUT(ret);
0145     }
0146     
0147     /* if it is our node then skip update as processed locally by caller */
0148     
0149     if (nodeid == tpgetnodeid())
0150     {
0151         NDRX_LOG(log_debug, "Event received from our node (%d) - skip processing",
0152                 nodeid);
0153         goto out;
0154     }
0155     
0156     /* why? 
0157     op[3] = EXEOS;
0158     */
0159     
0160     /* strtok cannot handle empty fields! it goes to next and we get here 
0161      * service name as flags... thus use strsep() */
0162     if (NULL==(flags = ndrx_strsep(&extradata, "/")))
0163     {
0164         NDRX_LOG(log_error, "Invalid event [%s] received - failed to get 'flags'",
0165                 last_call->extradata);
0166         EXFAIL_OUT(ret);
0167     }
0168     
0169     if (NULL==(svcnm = ndrx_strsep(&extradata, "/")))
0170     {
0171         NDRX_LOG(log_error, "Invalid event [%s] received - failed to get "
0172                 "'service name' for cache op", last_call->extradata);
0173         EXFAIL_OUT(ret);
0174     }
0175     
0176     NDRX_LOG(log_info, "Received operation [%s] flags [%s] for service [%s]",
0177             op, flags, svcnm);
0178     
0179     /* check is op correct? */
0180     if (0==strncmp(op, NDRX_CACHE_EV_PUTCMD, NDRX_CACHE_EV_LEN))
0181     {
0182         NDRX_LOG(log_debug, "performing put (save to cache)...");
0183 
0184         /* ok we shall get the cluster node id of the caller.. */
0185         if (EXSUCCEED!=ndrx_cache_save (svcnm, p_svc->data, 
0186             p_svc->len, last_call->user3, last_call->user4, nodeid, 0L,
0187                 /* user1 & user2: */
0188                 last_call->rval, last_call->rcode, EXTRUE))
0189         {
0190             NDRX_LOG(log_error, "Failed to save cache data: %s", 
0191                     tpstrerror(tperrno));
0192             EXFAIL_OUT(ret);
0193         }
0194     }
0195     else if (0==strncmp(op, NDRX_CACHE_EV_DELCMD, NDRX_CACHE_EV_LEN))
0196     {
0197         NDRX_LOG(log_debug, "Delete record by data");
0198         /* Delete cache according to flags, if FULL specified, then drop all matched 
0199          * cache (no matter of they keys)
0200          */
0201         if (EXSUCCEED!=ndrx_cache_inval_by_data(svcnm, p_svc->data, p_svc->len, flags))
0202         {
0203             NDRX_LOG(log_error, "Failed to save cache data: %s",
0204                     tpstrerror(tperrno));
0205             EXFAIL_OUT(ret);
0206         }
0207     }
0208     else if (0==strncmp(op, NDRX_CACHE_EV_KILCMD, NDRX_CACHE_EV_LEN))
0209     {
0210         NDRX_LOG(log_debug, "Drop cache event");
0211         /*
0212          * In this case the svcnm is database and we remove all records from it
0213          */
0214         if (EXSUCCEED!=ndrx_cache_drop(svcnm, nodeid))
0215         {
0216             NDRX_LOG(log_error, "Failed to drop cache: %s", tpstrerror(tperrno));
0217             EXFAIL_OUT(ret);
0218         }
0219     }
0220     else if (0==strncmp(op, NDRX_CACHE_EV_MSKDELCMD, NDRX_CACHE_EV_LEN))
0221     {
0222         int deleted;
0223         char keyexpr[NDRX_CACHE_OPEXPRMAX+1];
0224         UBFH *p_ub = (UBFH *)p_svc->data;
0225         BFLDLEN len = sizeof(keyexpr);
0226         
0227         if (EXSUCCEED!=Bget(p_ub, EX_CACHE_OPEXPR, 0, keyexpr, &len))
0228         {
0229             NDRX_CACHE_TPERROR(TPENOENT, "Missing expression for mask delete "
0230                     "for [%s] db!", svcnm);
0231             EXFAIL_OUT(ret);
0232         }
0233         
0234         if (0 > (deleted = ndrx_cache_inval_by_expr(svcnm, keyexpr, nodeid)))
0235         {
0236             NDRX_LOG(log_error, "Failed to delete db [%s] by key expression [%s]",
0237                     svcnm, keyexpr);
0238             EXFAIL_OUT(ret);
0239         }
0240         
0241         NDRX_LOG(log_info, "Delete %ld cache records matching key expression [%s]",
0242                 deleted, keyexpr);
0243     }
0244     else if (0==strncmp(op, NDRX_CACHE_EV_KEYDELCMD, NDRX_CACHE_EV_LEN))
0245     {
0246         int deleted;
0247         char key[NDRX_CACHE_OPEXPRMAX+1];
0248         UBFH *p_ub = (UBFH *)p_svc->data;
0249         BFLDLEN len = sizeof(key);
0250         
0251         if (EXSUCCEED!=Bget(p_ub, EX_CACHE_OPEXPR, 0, key, &len))
0252         {
0253             NDRX_CACHE_TPERROR(TPENOENT, "Missing expression for mask delete "
0254                     "for [%s] db!: %s", svcnm, Bstrerror(Berror));
0255             EXFAIL_OUT(ret);
0256         }
0257         
0258         if (0 > (deleted = ndrx_cache_inval_by_key(svcnm, NULL, key, nodeid, 
0259                 NULL, EXFALSE)))
0260         {
0261             NDRX_LOG(log_error, "Failed to delete db [%s] by key [%s]: %s",
0262                     svcnm, key, Bstrerror(Berror));
0263             EXFAIL_OUT(ret);
0264         }
0265         
0266         NDRX_LOG(log_info, "Delete %ld cache records matching key [%s]",
0267                 deleted, key);
0268     }    
0269     else
0270     {
0271         NDRX_LOG(log_error, "Unsupported cache command received [%s]",
0272                 op);
0273         EXFAIL_OUT(ret);
0274     }
0275     
0276 out:
0277 
0278     if (NULL!=extradata_p)
0279     {
0280         NDRX_FREE(extradata_p);
0281     }
0282 
0283     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0284                 0,
0285                 NULL,
0286                 0L,
0287                 0L);
0288 }
0289 
0290 /**
0291  * Standard server init
0292  */
0293 int NDRX_INTEGRA(tpsvrinit)(int argc, char **argv)
0294 {
0295     int ret=EXSUCCEED;
0296     char cachesvc[MAXTIDENT+1];
0297     char mgmtsvc[MAXTIDENT+1];
0298     long nodeid;
0299     string_list_t *list = NULL, *el;
0300     TPEVCTL evctl;
0301     
0302     NDRX_LOG(log_debug, "%s called", __func__);
0303     
0304     memset(&evctl, 0, sizeof(evctl));
0305     
0306     nodeid = tpgetnodeid();
0307     
0308     if (EXFAIL==nodeid)
0309     {
0310         NDRX_LOG(log_error, "Failed to get nodeid: %s", tpstrerror(tperrno));
0311         EXFAIL_OUT(ret);
0312     }
0313     
0314     snprintf(cachesvc, sizeof(cachesvc), NDRX_CACHE_EVSVC, nodeid);
0315 
0316     if (EXSUCCEED!=tpadvertise(cachesvc, CACHEEV))
0317     {
0318         NDRX_LOG(log_error, "Failed to initialize [%s]!", cachesvc);
0319         EXFAIL_OUT(ret);
0320     }
0321     
0322     snprintf(mgmtsvc, sizeof(mgmtsvc), NDRX_CACHE_MGSVC, nodeid);
0323     
0324     if (EXSUCCEED!=tpadvertise(mgmtsvc, CACHEMG))
0325     {
0326         NDRX_LOG(log_error, "Failed to initialize [%s]!", mgmtsvc);
0327         EXFAIL_OUT(ret);
0328     }
0329         
0330     /* Process the caches and subscribe to corresponding events */
0331     
0332     if (EXSUCCEED!=ndrx_cache_events_get(&list))
0333     {
0334         NDRX_LOG(log_error, "Failed to get list of events to subscribe to!");
0335         EXFAIL_OUT(ret);
0336     }
0337     
0338     /* loop over the list and subscribe to */
0339     
0340     LL_FOREACH(list, el)
0341     {
0342         NDRX_STRCPY_SAFE(evctl.name1, cachesvc);
0343         evctl.flags|=TPEVSERVICE;
0344         
0345         if (EXFAIL==tpsubscribe(el->qname, NULL, &evctl, 0L))
0346         {
0347             NDRX_LOG(log_error, "Failed to subscribe to event: [%s] "
0348                 "target service: [%s]", el->qname, evctl.name1);
0349             EXFAIL_OUT(ret);
0350         }
0351     }
0352     
0353 out:
0354 
0355     if (NULL!=list)
0356     {
0357         ndrx_string_list_free(list);
0358     }
0359 
0360     return ret;
0361 }
0362 
0363 void NDRX_INTEGRA(tpsvrdone) (void)
0364 {
0365     NDRX_LOG(log_debug, "%s called", __func__);
0366 }
0367 /* vim: set ts=4 sw=4 et smartindent: */