Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Management routines of cache server
0003  *
0004  * @file mgt.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <sys_unix.h>
0044 #include <atmi_int.h>
0045 #include <typed_buf.h>
0046 #include <ndrstandard.h>
0047 #include <ubf.h>
0048 #include <Exfields.h>
0049 #include <atmi_shm.h>
0050 #include <exregex.h>
0051 #include "tpcachesv.h"
0052 #include <atmi_cache.h>
0053 #include <ubfutil.h>
0054 /*---------------------------Externs------------------------------------*/
0055 /*---------------------------Macros-------------------------------------*/
0056 #define REJECT(UB, TPE, MSG)\
0057             NDRX_LOG(log_error, "Reject with %d: %s", TPE, MSG);\
0058             atmi_reject(UB, TPE, MSG);
0059 /*---------------------------Enums--------------------------------------*/
0060 /*---------------------------Typedefs-----------------------------------*/
0061 /*---------------------------Globals------------------------------------*/
0062 /*---------------------------Statics------------------------------------*/
0063 /*---------------------------Prototypes---------------------------------*/
0064 
0065 /**
0066  * Reject request with installing error in buffer
0067  * @param p_svc
0068  */
0069 exprivate void atmi_reject(UBFH *p_ub, long in_tperrno, char *in_msg)
0070 {
0071     if (0!=in_tperrno)
0072     {
0073         Bchg(p_ub, EX_TPERRNO, 0, (char *)&in_tperrno, 0L);
0074         Bchg(p_ub, EX_TPSTRERROR, 0, in_msg, 0L);
0075     }
0076     else
0077     {
0078         in_tperrno = TPESYSTEM;
0079         Bchg(p_ub, EX_TPERRNO, 0, (char *)&in_tperrno, 0L);
0080         Bchg(p_ub, EX_TPSTRERROR, 0, "System error occurred, see logs", 0L);
0081 
0082     }
0083 }
0084 
0085 /**
0086  * Loop over the database and stream results to terminal
0087  * @param pp_ub double ptr to DUBF
0088  * @return 
0089  */
0090 exprivate int cache_show(int cd, UBFH **pp_ub)
0091 {
0092     int ret = EXSUCCEED;
0093     char cachedb[NDRX_CCTAG_MAX+1];
0094     ndrx_tpcache_db_t *db;
0095     char tmp[256];
0096     EDB_txn *txn = NULL;
0097     EDB_cursor *cursor;
0098     EDB_cursor_op op;
0099     EDB_val keydb, val;
0100     int tran_started = EXFALSE;
0101     ndrx_tpcache_data_t *cdata;
0102     long revent;
0103     int cursor_open = EXFALSE;
0104     int align;
0105     char *defer_free = NULL;
0106     
0107     /* ok get db name */
0108     
0109     if (EXSUCCEED!=Bget(*pp_ub, EX_CACHE_DBNAME, 0, cachedb, 0L))
0110     {
0111         NDRX_LOG(log_error, "Missing EX_CACHE_DBNAME: %s", Bstrerror(Berror));
0112         REJECT(*pp_ub, TPESYSTEM, "Failed to get EX_CACHE_DBNAME!");
0113     }
0114     
0115     NDRX_LOG(log_debug, "[%s] db lookup", cachedb);
0116     
0117     if (NULL==(db = ndrx_cache_dbresolve(cachedb, NDRX_TPCACH_INIT_NORMAL)))
0118     {
0119         /* if have ATMI error use, else failed to open db */
0120         
0121         if (ndrx_TPis_error())
0122         {
0123             REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0124         }
0125         else
0126         {
0127             snprintf(tmp, sizeof(tmp), "Failed to resolve [%s] db", cachedb);
0128             REJECT(*pp_ub, TPENOENT, tmp);
0129         }
0130         
0131         EXFAIL_OUT(ret);
0132     }
0133     
0134     /* OK we have a cache, so loop over (this will include duplicate records too
0135      * so that admin can see the picture of all this)
0136      */
0137 
0138     /* start transaction */
0139     if (EXSUCCEED!=(ret=ndrx_cache_edb_begin(db, &txn, EDB_RDONLY)))
0140     {
0141         NDRX_CACHE_TPERROR(TPESYSTEM, "%s: failed to start tran", __func__);
0142         goto out;
0143     }
0144     
0145     tran_started = EXTRUE;
0146 
0147     /* loop over the database */
0148     if (EXSUCCEED!=ndrx_cache_edb_cursor_open(db, txn, &cursor))
0149     {
0150         NDRX_LOG(log_error, "Failed to open cursor");
0151         EXFAIL_OUT(ret);
0152     }
0153     cursor_open = EXTRUE;
0154     
0155     /* loop over the db and match records  */
0156     
0157     op = EDB_FIRST;
0158     do
0159     {
0160         if (defer_free)
0161         {
0162             NDRX_FREE(defer_free);
0163             defer_free = NULL;
0164         }
0165         
0166         if (EXSUCCEED!=(ret = ndrx_cache_edb_cursor_getfullkey(db, cursor, 
0167                 &keydb, &val, op, &align)))
0168         {
0169             if (EDB_NOTFOUND==ret)
0170             {
0171                 /* this is ok */
0172                 ret = EXSUCCEED;
0173                 break;
0174             }
0175             else
0176             {
0177                 REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0178                 NDRX_LOG(log_error, "Failed to loop over the [%s] db", cachedb);
0179                 break;
0180             }
0181         }
0182         
0183         if (align)
0184         {
0185             defer_free = val.mv_data;
0186         }
0187         
0188         /* Validate DB rec */
0189         
0190         if (EXEOS!=((char *)keydb.mv_data)[keydb.mv_size-1])
0191         {
0192             NDRX_DUMP(log_error, "Invalid cache key", 
0193                     keydb.mv_data, keydb.mv_size);
0194             
0195             NDRX_LOG(log_error, "%s: Invalid cache key, len: %ld not "
0196                     "terminated with EOS!", __func__, keydb.mv_size);
0197             REJECT(*pp_ub, TPESYSTEM, "Corrupted db - Invalid key format");
0198             EXFAIL_OUT(ret);
0199         }
0200         
0201         if (val.mv_size < sizeof(ndrx_tpcache_data_t))
0202         {
0203             snprintf(tmp, sizeof(tmp), "Corrupted data - invalid minimums size, "
0204                     "expected: %ld, got %ld", 
0205                     (long)sizeof(ndrx_tpcache_data_t), (long)val.mv_size);
0206             REJECT(*pp_ub, TPESYSTEM, tmp);
0207             EXFAIL_OUT(ret);
0208         }
0209         
0210         /* Load record into UBF and send it away... */
0211         
0212         cdata = (ndrx_tpcache_data_t *)val.mv_data;
0213         
0214         
0215         if (NDRX_CACHE_MAGIC!=cdata->magic)
0216         {
0217             snprintf(tmp, sizeof(tmp), "Corrupted data - invalid magic expected: %x got %x",
0218                     cdata->magic, NDRX_CACHE_MAGIC);
0219             REJECT(*pp_ub, TPESYSTEM, tmp);
0220             EXFAIL_OUT(ret);
0221         }
0222         
0223         if (EXSUCCEED!=ndrx_cache_mgt_data2ubf(cdata, keydb.mv_data, pp_ub, EXFALSE))
0224         {
0225             REJECT(*pp_ub, TPESYSTEM, "Failed to load data info UBF!");
0226             EXFAIL_OUT(ret);
0227         }
0228         
0229         ndrx_debug_dump_UBF(log_debug, "Sending packet", *pp_ub);
0230         
0231         /* send stuff away */
0232         if (EXFAIL == tpsend(cd,
0233                             (char *)*pp_ub,
0234                             0L,
0235                             0,
0236                             &revent))
0237         {
0238             
0239             NDRX_LOG(log_error, "Send data failed [%s] %ld",
0240                                 tpstrerror(tperrno), revent);
0241             
0242             REJECT(*pp_ub, tperrno, "Failed to stream reply data");
0243             
0244             EXFAIL_OUT(ret);
0245         }
0246         else
0247         {
0248             NDRX_LOG(log_debug,"sent ok");
0249         }
0250         
0251         if (EDB_FIRST == op)
0252         {
0253             op = EDB_NEXT;
0254         }
0255         
0256     } while (EXSUCCEED==ret);
0257     
0258 out:
0259     
0260     if (cursor_open)
0261     {
0262         edb_cursor_close(cursor);
0263     }
0264 
0265     if (tran_started)
0266     {
0267         ndrx_cache_edb_abort(db, txn);
0268     }
0269 
0270     if (defer_free)
0271     {
0272         NDRX_FREE(defer_free);
0273     }
0274 
0275     return ret;
0276 }
0277 
0278 /**
0279  * Write cache record to UBF
0280  * @param pp_ub
0281  * @return EXSUCCEED/EXFAIL
0282  */
0283 exprivate int cache_dump(UBFH **pp_ub)
0284 {
0285     int ret = EXSUCCEED;
0286     char cachedb[NDRX_CCTAG_MAX+1];
0287     ndrx_tpcache_db_t *db;
0288     char tmp[256];
0289     EDB_txn *txn = NULL;
0290     EDB_cursor *cursor;
0291     int cursor_open = EXFALSE;
0292     EDB_val val;
0293     int tran_started = EXFALSE;
0294     ndrx_tpcache_data_t *cdata;
0295     char *key = NULL;
0296     int align;
0297     char *defer_free = NULL;
0298     
0299     /* ok get db name */
0300     
0301     if (EXSUCCEED!=Bget(*pp_ub, EX_CACHE_DBNAME, 0, cachedb, 0L))
0302     {
0303         NDRX_LOG(log_error, "Missing EX_CACHE_DBNAME: %s", Bstrerror(Berror));
0304         REJECT(*pp_ub, TPESYSTEM, "Failed to get EX_CACHE_DBNAME!");
0305     }
0306     
0307     NDRX_LOG(log_debug, "[%s] db lookup", cachedb);
0308     
0309     if (NULL==(db = ndrx_cache_dbresolve(cachedb, NDRX_TPCACH_INIT_NORMAL)))
0310     {
0311         /* if have ATMI error use, else failed to open db */
0312         
0313         if (ndrx_TPis_error())
0314         {
0315             REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0316         }
0317         else
0318         {
0319             snprintf(tmp, sizeof(tmp), "Failed to resolve [%s] db", cachedb);
0320             REJECT(*pp_ub, TPENOENT, tmp);
0321         }
0322         
0323         EXFAIL_OUT(ret);
0324     }
0325 
0326     /* start transaction */
0327     if (EXSUCCEED!=(ret=ndrx_cache_edb_begin(db, &txn, EDB_RDONLY)))
0328     {
0329         NDRX_CACHE_TPERROR(TPESYSTEM, "%s: failed to start tran", __func__);
0330         goto out;
0331     }
0332     
0333     tran_started = EXTRUE;
0334 
0335     /* loop over the database */
0336     if (EXSUCCEED!=ndrx_cache_edb_cursor_open(db, txn, &cursor))
0337     {
0338         NDRX_LOG(log_error, "Failed to open cursor");
0339         EXFAIL_OUT(ret);
0340     }
0341 
0342     cursor_open = EXTRUE;
0343     
0344     if (NULL==(key = Bgetalloc(*pp_ub, EX_CACHE_OPEXPR, 0, NULL)))
0345     {
0346         REJECT(*pp_ub, TPESYSTEM, "Failed to get EX_CACHE_OPEXPR");
0347         EXFAIL_OUT(ret);
0348     }
0349     
0350     /* read db record */
0351     if (EXSUCCEED!=ndrx_cache_edb_get(db, txn, key, &val, EXTRUE, &align))
0352     {
0353         REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0354         EXFAIL_OUT(ret);
0355     }
0356     
0357     if (align)
0358     {
0359         defer_free = val.mv_data;
0360     }
0361             
0362     /* Validate DB rec */
0363 
0364     if (val.mv_size < sizeof(ndrx_tpcache_data_t))
0365     {
0366         snprintf(tmp, sizeof(tmp), "Corrupted data - invalid minimums size, "
0367                 "expected: %ld, got %ld", 
0368                 (long)sizeof(ndrx_tpcache_data_t), (long)val.mv_size);
0369         REJECT(*pp_ub, TPESYSTEM, tmp);
0370         EXFAIL_OUT(ret);
0371     }
0372 
0373     /* Load record into UBF and send it away... */
0374 
0375     cdata = (ndrx_tpcache_data_t *)val.mv_data;
0376 
0377     if (NDRX_CACHE_MAGIC!=cdata->magic)
0378     {
0379         snprintf(tmp, sizeof(tmp), "Corrupted data - invalid magic expected: %x got %x",
0380                 cdata->magic, NDRX_CACHE_MAGIC);
0381         REJECT(*pp_ub, TPESYSTEM, tmp);
0382         EXFAIL_OUT(ret);
0383     }
0384 
0385     if (EXSUCCEED!=ndrx_cache_mgt_data2ubf(cdata, key, pp_ub, EXTRUE))
0386     {
0387         REJECT(*pp_ub, TPESYSTEM, "Failed to load data info UBF!");
0388         EXFAIL_OUT(ret);
0389     }
0390     
0391     ndrx_debug_dump_UBF(log_debug, "Dump buffer built", *pp_ub);
0392     
0393 
0394 out:
0395 
0396     if (cursor_open)
0397     {
0398         edb_cursor_close(cursor);
0399     }
0400 
0401     if (tran_started)
0402     {
0403         ndrx_cache_edb_abort(db, txn);
0404     }
0405 
0406     if (NULL!=key)
0407     {
0408         NDRX_FREE(key);
0409     }
0410 
0411     if (NULL!=defer_free)
0412     {
0413         NDRX_FREE(defer_free);
0414     }
0415 
0416     return ret;
0417 }
0418 
0419 /**
0420  * Delete cache
0421  * - if present only db, then drop full db
0422  * - if present key only, then remove by key
0423  * - if present key with EX_CACHE_FLAGS, then delete 
0424  * @param pp_ub
0425  * @return EXSUCCEED/EXFAIL
0426  */
0427 exprivate int cache_del(UBFH **pp_ub)
0428 {
0429     int ret = EXSUCCEED;
0430     char cachedb[NDRX_CCTAG_MAX+1];
0431     long flags;
0432     char *key = NULL;
0433     char tmp[256];
0434     long deleted;
0435     
0436     /* ok get db name */
0437     
0438     /* resize buffer a bit */
0439     
0440     if (NULL==(*pp_ub = (UBFH *)tprealloc((char *)*pp_ub, Bused(*pp_ub) + 1024)))
0441     {
0442         NDRX_LOG(log_error, "Failed to realloc!");
0443         EXFAIL_OUT(ret);
0444     }
0445     
0446     if (EXSUCCEED!=Bget(*pp_ub, EX_CACHE_DBNAME, 0, cachedb, 0L))
0447     {
0448         NDRX_LOG(log_error, "Missing EX_CACHE_DBNAME: %s", Bstrerror(Berror));
0449         REJECT(*pp_ub, TPESYSTEM, "Failed to get EX_CACHE_DBNAME!");
0450     }
0451     
0452     /* get key if have one */
0453     key = Bgetalloc(*pp_ub, EX_CACHE_OPEXPR, 0, 0L);
0454     
0455     if (Bpres(*pp_ub, EX_CACHE_FLAGS, 0))
0456     {
0457         if (EXSUCCEED!=Bget(*pp_ub, EX_CACHE_FLAGS, 0, (char *)&flags, 0L))
0458         {
0459             snprintf(tmp, sizeof(tmp), "Failed to get EX_CACHE_FLAGS: %s", 
0460                     Bstrerror(Berror));
0461             REJECT(*pp_ub, TPENOENT, tmp);
0462             EXFAIL_OUT(ret);
0463         }
0464     }
0465     
0466     if (NULL==key)
0467     {
0468         /* Drop full DB */
0469         NDRX_LOG(log_info, "Delete full database: [%s]", cachedb);
0470         
0471         if (EXSUCCEED!=ndrx_cache_drop(cachedb, (short)tpgetnodeid()))
0472         {
0473             REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0474             EXFAIL_OUT(ret);
0475         }
0476         
0477     }
0478     else if (flags & NDRX_CACHE_SVCMDF_DELREG)
0479     {
0480         NDRX_LOG(log_info, "Delete by regular expression: [%s] expr: [%s]", 
0481                 cachedb, key);
0482         
0483         if (0>(deleted=ndrx_cache_inval_by_expr(cachedb, key, (short)tpgetnodeid())))
0484         {
0485             REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0486             EXFAIL_OUT(ret);
0487         }
0488         
0489         NDRX_LOG(log_info, "Deleted %ld records", deleted);
0490         
0491         if (EXSUCCEED!=Bchg(*pp_ub, EX_CACHE_NRDEL, 0, (char *)&deleted, 0L))
0492         {
0493             snprintf(tmp, sizeof(tmp), "Failed to set EX_CACHE_NRDEL: %s", 
0494                     Bstrerror(Berror));
0495             REJECT(*pp_ub, TPESYSTEM, Bstrerror(Berror));
0496             EXFAIL_OUT(ret);
0497         }
0498     }
0499     else
0500     {
0501         /* delete by key */
0502         NDRX_LOG(log_info, "Delete by key. DB: [%s] Key: [%s]", 
0503                 cachedb, key);
0504         
0505         if (0>(deleted=ndrx_cache_inval_by_key(cachedb, NULL, key, 
0506                 (short)tpgetnodeid(), NULL, EXFALSE)))
0507         {
0508             REJECT(*pp_ub, tperrno, tpstrerror(tperrno));
0509             EXFAIL_OUT(ret);
0510         }
0511         
0512         NDRX_LOG(log_info, "Deleted %ld records", deleted);
0513         
0514         if (EXSUCCEED!=Bchg(*pp_ub, EX_CACHE_NRDEL, 0, (char *)&deleted, 0L))
0515         {
0516             snprintf(tmp, sizeof(tmp), "Failed to set EX_CACHE_NRDEL: %s", 
0517                     Bstrerror(Berror));
0518             REJECT(*pp_ub, TPESYSTEM, Bstrerror(Berror));
0519             EXFAIL_OUT(ret);
0520         }
0521     }
0522 
0523 out:
0524 
0525 
0526     if (NULL!=key)
0527     {
0528         NDRX_FREE(key);
0529     }
0530 
0531     return ret;
0532 }
0533 
0534 /**
0535  * Process management requests (list headers, list all, list 
0536  * single, delete all, delete single).
0537  * The dump shall be provided in tpexport format.
0538  * 
0539  * @param p_svc
0540  */
0541 void CACHEMG (TPSVCINFO *p_svc)
0542 {
0543     int ret = EXSUCCEED;
0544     UBFH *p_ub = (UBFH *)p_svc->data;
0545     ndrx_tpcache_db_t *db;
0546     char cmd;
0547     char tmp[256];
0548     /* Reallocate the p_svc  */
0549     
0550     if (NULL==(p_ub=(UBFH *)tprealloc((char *)p_ub, Bused(p_ub)+1024)))
0551     {
0552         NDRX_LOG(log_error, "Failed to realloc UBF!");
0553         EXFAIL_OUT(ret);
0554     }
0555     
0556     /* get command code */
0557     
0558     if (EXSUCCEED!=Bget(p_ub, EX_CACHE_CMD, 0, &cmd, 0L))
0559     {
0560         REJECT(p_ub, TPESYSTEM, "internal error: missing command field");
0561         EXFAIL_OUT(ret);
0562     }
0563     
0564     NDRX_LOG(log_info, "Management command: %c", cmd);
0565     
0566     switch (cmd)
0567     {
0568         case NDRX_CACHE_SVCMD_CLSHOW:
0569             
0570             if (EXSUCCEED!=cache_show(p_svc->cd, &p_ub))
0571             {
0572                 NDRX_LOG(log_error, "Failed to call cache_show()");
0573                 EXFAIL_OUT(ret);
0574             }
0575             
0576             break;
0577         case NDRX_CACHE_SVCMD_CLCDUMP:
0578             
0579             if (EXSUCCEED!=cache_dump(&p_ub))
0580             {
0581                 NDRX_LOG(log_error, "Failed to call cache_dump()");
0582                 EXFAIL_OUT(ret);
0583             }
0584             
0585             break;
0586         case NDRX_CACHE_SVCMD_CLDEL:
0587             
0588             if (EXSUCCEED!=cache_del(&p_ub))
0589             {
0590                 NDRX_LOG(log_error, "Failed to call cache_del()");
0591                 EXFAIL_OUT(ret);
0592             }
0593             
0594             break;
0595         default:
0596             snprintf(tmp, sizeof(tmp), "Invalid command [%c]", cmd);
0597             REJECT(p_ub, TPESYSTEM, tmp);
0598             break;
0599     }
0600     
0601 out:
0602 
0603     if (EXSUCCEED!=ret && !Bpres(p_ub, EX_TPERRNO, 0))
0604     {
0605         REJECT(p_ub, TPESYSTEM, "Operation failed, see logs");
0606     }
0607 
0608     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0609         0L,
0610         (char *)p_ub,
0611         0L,
0612         0L);
0613 
0614 }
0615 
0616 /* vim: set ts=4 sw=4 et smartindent: */