0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043
0044 #include <ndebug.h>
0045 #include <atmi.h>
0046 #include <sys_unix.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <atmi_shm.h>
0053 #include <exregex.h>
0054 #include "tpcachesv.h"
0055 #include <atmi_cache.h>
0056 #include <ubfutil.h>
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069 void CACHEEV (TPSVCINFO *p_svc)
0070 {
0071 int ret=EXSUCCEED;
0072 tp_command_call_t * last_call;
0073 char *extradata;
0074 char *extradata_p;
0075 char nodeidstr[3+1];
0076 char *op;
0077 int nodeid;
0078 char *flags;
0079 char *svcnm;
0080 int len;
0081 char type[XATMI_SUBTYPE_LEN+1];
0082
0083
0084
0085 if (NULL!=p_svc->data)
0086 {
0087 if (EXFAIL==tptypes(p_svc->data, type, NULL))
0088 {
0089 NDRX_LOG(log_error, "Faileld to get incoming buffer type: %s",
0090 tpstrerror(tperrno));
0091 EXFAIL_OUT(ret);
0092 }
0093
0094 if (0==strcmp(type, "UBF"))
0095 {
0096 ndrx_debug_dump_UBF(log_debug, "Received UBF:", (UBFH *)p_svc->data);
0097 }
0098 }
0099
0100
0101
0102
0103 last_call=ndrx_get_G_last_call();
0104
0105
0106
0107 extradata = extradata_p = NDRX_STRDUP(last_call->extradata);
0108
0109 if (NULL==extradata)
0110 {
0111 NDRX_LOG(log_error, "strdup failed: %s", tpstrerror(tperrno));
0112 EXFAIL_OUT(ret);
0113 }
0114
0115 NDRX_LOG(log_info, "Received event op: [%s]", extradata);
0116
0117 if (NULL==(op = ndrx_strsep(&extradata, "/")))
0118 {
0119 NDRX_LOG(log_error, "Invalid event [%s] received - failed to get 'operation'",
0120 last_call->extradata);
0121 EXFAIL_OUT(ret);
0122 }
0123
0124 len = strlen(op);
0125
0126 if (len != NDRX_CACHE_EV_PFXLEN)
0127 {
0128 NDRX_LOG(log_error, "Invalid event prefix, expected len: %d, got: %d",
0129 NDRX_CACHE_EV_PFXLEN, len);
0130 EXFAIL_OUT(ret);
0131 }
0132
0133 nodeidstr[0] = op[3];
0134 nodeidstr[1] = op[3+1];
0135 nodeidstr[2] = op[3+2];
0136 nodeidstr[3] = EXEOS;
0137
0138 nodeid = atoi(nodeidstr);
0139
0140 if (nodeid<=0)
0141 {
0142 NDRX_LOG(log_error, "Invalid node id received [%d] must be > 0!",
0143 nodeid);
0144 EXFAIL_OUT(ret);
0145 }
0146
0147
0148
0149 if (nodeid == tpgetnodeid())
0150 {
0151 NDRX_LOG(log_debug, "Event received from our node (%d) - skip processing",
0152 nodeid);
0153 goto out;
0154 }
0155
0156
0157
0158
0159
0160
0161
0162 if (NULL==(flags = ndrx_strsep(&extradata, "/")))
0163 {
0164 NDRX_LOG(log_error, "Invalid event [%s] received - failed to get 'flags'",
0165 last_call->extradata);
0166 EXFAIL_OUT(ret);
0167 }
0168
0169 if (NULL==(svcnm = ndrx_strsep(&extradata, "/")))
0170 {
0171 NDRX_LOG(log_error, "Invalid event [%s] received - failed to get "
0172 "'service name' for cache op", last_call->extradata);
0173 EXFAIL_OUT(ret);
0174 }
0175
0176 NDRX_LOG(log_info, "Received operation [%s] flags [%s] for service [%s]",
0177 op, flags, svcnm);
0178
0179
0180 if (0==strncmp(op, NDRX_CACHE_EV_PUTCMD, NDRX_CACHE_EV_LEN))
0181 {
0182 NDRX_LOG(log_debug, "performing put (save to cache)...");
0183
0184
0185 if (EXSUCCEED!=ndrx_cache_save (svcnm, p_svc->data,
0186 p_svc->len, last_call->user3, last_call->user4, nodeid, 0L,
0187
0188 last_call->rval, last_call->rcode, EXTRUE))
0189 {
0190 NDRX_LOG(log_error, "Failed to save cache data: %s",
0191 tpstrerror(tperrno));
0192 EXFAIL_OUT(ret);
0193 }
0194 }
0195 else if (0==strncmp(op, NDRX_CACHE_EV_DELCMD, NDRX_CACHE_EV_LEN))
0196 {
0197 NDRX_LOG(log_debug, "Delete record by data");
0198
0199
0200
0201 if (EXSUCCEED!=ndrx_cache_inval_by_data(svcnm, p_svc->data, p_svc->len, flags))
0202 {
0203 NDRX_LOG(log_error, "Failed to save cache data: %s",
0204 tpstrerror(tperrno));
0205 EXFAIL_OUT(ret);
0206 }
0207 }
0208 else if (0==strncmp(op, NDRX_CACHE_EV_KILCMD, NDRX_CACHE_EV_LEN))
0209 {
0210 NDRX_LOG(log_debug, "Drop cache event");
0211
0212
0213
0214 if (EXSUCCEED!=ndrx_cache_drop(svcnm, nodeid))
0215 {
0216 NDRX_LOG(log_error, "Failed to drop cache: %s", tpstrerror(tperrno));
0217 EXFAIL_OUT(ret);
0218 }
0219 }
0220 else if (0==strncmp(op, NDRX_CACHE_EV_MSKDELCMD, NDRX_CACHE_EV_LEN))
0221 {
0222 int deleted;
0223 char keyexpr[NDRX_CACHE_OPEXPRMAX+1];
0224 UBFH *p_ub = (UBFH *)p_svc->data;
0225 BFLDLEN len = sizeof(keyexpr);
0226
0227 if (EXSUCCEED!=Bget(p_ub, EX_CACHE_OPEXPR, 0, keyexpr, &len))
0228 {
0229 NDRX_CACHE_TPERROR(TPENOENT, "Missing expression for mask delete "
0230 "for [%s] db!", svcnm);
0231 EXFAIL_OUT(ret);
0232 }
0233
0234 if (0 > (deleted = ndrx_cache_inval_by_expr(svcnm, keyexpr, nodeid)))
0235 {
0236 NDRX_LOG(log_error, "Failed to delete db [%s] by key expression [%s]",
0237 svcnm, keyexpr);
0238 EXFAIL_OUT(ret);
0239 }
0240
0241 NDRX_LOG(log_info, "Delete %ld cache records matching key expression [%s]",
0242 deleted, keyexpr);
0243 }
0244 else if (0==strncmp(op, NDRX_CACHE_EV_KEYDELCMD, NDRX_CACHE_EV_LEN))
0245 {
0246 int deleted;
0247 char key[NDRX_CACHE_OPEXPRMAX+1];
0248 UBFH *p_ub = (UBFH *)p_svc->data;
0249 BFLDLEN len = sizeof(key);
0250
0251 if (EXSUCCEED!=Bget(p_ub, EX_CACHE_OPEXPR, 0, key, &len))
0252 {
0253 NDRX_CACHE_TPERROR(TPENOENT, "Missing expression for mask delete "
0254 "for [%s] db!: %s", svcnm, Bstrerror(Berror));
0255 EXFAIL_OUT(ret);
0256 }
0257
0258 if (0 > (deleted = ndrx_cache_inval_by_key(svcnm, NULL, key, nodeid,
0259 NULL, EXFALSE)))
0260 {
0261 NDRX_LOG(log_error, "Failed to delete db [%s] by key [%s]: %s",
0262 svcnm, key, Bstrerror(Berror));
0263 EXFAIL_OUT(ret);
0264 }
0265
0266 NDRX_LOG(log_info, "Delete %ld cache records matching key [%s]",
0267 deleted, key);
0268 }
0269 else
0270 {
0271 NDRX_LOG(log_error, "Unsupported cache command received [%s]",
0272 op);
0273 EXFAIL_OUT(ret);
0274 }
0275
0276 out:
0277
0278 if (NULL!=extradata_p)
0279 {
0280 NDRX_FREE(extradata_p);
0281 }
0282
0283 tpreturn( ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0284 0,
0285 NULL,
0286 0L,
0287 0L);
0288 }
0289
0290
0291
0292
0293 int NDRX_INTEGRA(tpsvrinit)(int argc, char **argv)
0294 {
0295 int ret=EXSUCCEED;
0296 char cachesvc[MAXTIDENT+1];
0297 char mgmtsvc[MAXTIDENT+1];
0298 long nodeid;
0299 string_list_t *list = NULL, *el;
0300 TPEVCTL evctl;
0301
0302 NDRX_LOG(log_debug, "%s called", __func__);
0303
0304 memset(&evctl, 0, sizeof(evctl));
0305
0306 nodeid = tpgetnodeid();
0307
0308 if (EXFAIL==nodeid)
0309 {
0310 NDRX_LOG(log_error, "Failed to get nodeid: %s", tpstrerror(tperrno));
0311 EXFAIL_OUT(ret);
0312 }
0313
0314 snprintf(cachesvc, sizeof(cachesvc), NDRX_CACHE_EVSVC, nodeid);
0315
0316 if (EXSUCCEED!=tpadvertise(cachesvc, CACHEEV))
0317 {
0318 NDRX_LOG(log_error, "Failed to initialize [%s]!", cachesvc);
0319 EXFAIL_OUT(ret);
0320 }
0321
0322 snprintf(mgmtsvc, sizeof(mgmtsvc), NDRX_CACHE_MGSVC, nodeid);
0323
0324 if (EXSUCCEED!=tpadvertise(mgmtsvc, CACHEMG))
0325 {
0326 NDRX_LOG(log_error, "Failed to initialize [%s]!", mgmtsvc);
0327 EXFAIL_OUT(ret);
0328 }
0329
0330
0331
0332 if (EXSUCCEED!=ndrx_cache_events_get(&list))
0333 {
0334 NDRX_LOG(log_error, "Failed to get list of events to subscribe to!");
0335 EXFAIL_OUT(ret);
0336 }
0337
0338
0339
0340 LL_FOREACH(list, el)
0341 {
0342 NDRX_STRCPY_SAFE(evctl.name1, cachesvc);
0343 evctl.flags|=TPEVSERVICE;
0344
0345 if (EXFAIL==tpsubscribe(el->qname, NULL, &evctl, 0L))
0346 {
0347 NDRX_LOG(log_error, "Failed to subscribe to event: [%s] "
0348 "target service: [%s]", el->qname, evctl.name1);
0349 EXFAIL_OUT(ret);
0350 }
0351 }
0352
0353 out:
0354
0355 if (NULL!=list)
0356 {
0357 ndrx_string_list_free(list);
0358 }
0359
0360 return ret;
0361 }
0362
0363 void NDRX_INTEGRA(tpsvrdone) (void)
0364 {
0365 NDRX_LOG(log_debug, "%s called", __func__);
0366 }
0367