Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Cache sanity daemon - this will remove expired records from db
0003  *   We shall move all scanning to RO mode. Build the list for removal
0004  *   and process the writes in separate run (if requied). Also duplicate processing
0005  *   shall be left only for scandup flag - this will simplify the code.
0006  *
0007  * @file tpcached.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <string.h>
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <memory.h>
0041 #include <signal.h>
0042 #include <unistd.h>    /* for getopt */
0043 
0044 #include <atmi.h>
0045 #include <atmi_int.h>
0046 #include <atmi_shm.h>
0047 #include <ndrstandard.h>
0048 #include <Exfields.h>
0049 #include <ubf.h>
0050 #include <ubf_int.h>
0051 #include <ndebug.h>
0052 
0053 #include <atmi_cache.h>
0054 #include "tpcached.h"
0055 /*---------------------------Externs------------------------------------*/
0056 /*---------------------------Macros-------------------------------------*/    
0057 /*---------------------------Enums--------------------------------------*/
0058 /*---------------------------Typedefs-----------------------------------*/
0059 /*---------------------------Globals------------------------------------*/
0060 /*---------------------------Statics------------------------------------*/
0061 
0062 
0063 exprivate int M_sleep = 5;          /* perform actions at every X seconds */
0064 
0065 /* for this mask, sigint, sigterm, check sigs periodically */
0066 exprivate int M_shutdown = EXFALSE;  /* do we have shutdown requested?    */
0067 
0068 exprivate sigset_t  M_mask;
0069 
0070 /*---------------------------Prototypes---------------------------------*/
0071 
0072 /**
0073  * Perform init (read -i command line argument - interval)
0074  * @return 
0075  */
0076 expublic int init(int argc, char** argv)
0077 {
0078     int ret = EXSUCCEED;
0079     signed char c;
0080     
0081     /* Parse command line  */
0082     while ((c = getopt(argc, argv, "i:")) != -1)
0083     {
0084         NDRX_LOG(log_debug, "%c = [%s]", c, optarg);
0085         switch(c)
0086         {
0087             case 'i': 
0088                 M_sleep = atoi(optarg);
0089                 break;
0090             default:
0091                 /*return FAIL;*/
0092                 break;
0093         }
0094     }
0095 
0096     NDRX_LOG(log_debug, "Periodic sleep: %d secs", M_sleep);
0097     
0098     
0099     /* mask signal */
0100     sigemptyset(&M_mask);
0101     sigaddset(&M_mask, SIGINT);
0102     sigaddset(&M_mask, SIGTERM);
0103 
0104     if (EXSUCCEED!=sigprocmask(SIG_SETMASK, &M_mask, NULL))
0105     {
0106         NDRX_LOG(log_error, "Failed to set SIG_SETMASK: %s", strerror(errno));
0107         EXFAIL_OUT(ret);
0108     }
0109 
0110 out:
0111     return ret;
0112 }
0113 
0114 /**
0115  * Process database by record expiry
0116  * loop over the db and remove expired records.
0117  * This also removes records for which service does not exists (if marked so
0118  * by flags)
0119  * So we need to schedule records here for removal....
0120  * @param db cache database 
0121  * @return EXSUCCED/EXFAIL
0122  */
0123 exprivate int proc_db_expiry_nosvc(ndrx_tpcache_db_t *db)
0124 {
0125     int ret = EXSUCCEED;
0126     EDB_txn *txn = NULL;
0127     int tran_started = EXFALSE;
0128     int cursor_open = EXFALSE;
0129     EDB_cursor *cursor;
0130     EDB_cursor_op op;
0131     EDB_val keydb, val;
0132     long t;
0133     long tusec;
0134     long deleted = 0;
0135     int tmp_is_bridge;
0136     char send_q[NDRX_MAX_Q_SIZE+1];
0137     char prev_key[NDRX_CACHE_KEY_MAX+1] = {EXEOS};
0138     char cur_key[NDRX_CACHE_KEY_MAX+1] = {EXEOS};
0139     
0140     ndrx_tpcached_msglist_t * exp_list = NULL;
0141     int align;        
0142     char *defer_free = NULL;
0143             
0144     ndrx_tpcache_data_t *pdata;
0145     
0146     NDRX_LOG(log_debug, "%s enter dbname=[%s]", __func__, db->cachedb);
0147     
0148     /* start transaction */
0149     if (EXSUCCEED!=(ret=ndrx_cache_edb_begin(db, &txn, 0)))
0150     {
0151         NDRX_LOG(log_error, "%s: failed to start tran: %s", __func__, 
0152                 tpstrerror(tperrno));
0153         goto out;
0154     }
0155     
0156     tran_started = EXTRUE;
0157     
0158     
0159     /* loop over the database */
0160     if (EXSUCCEED!=ndrx_cache_edb_cursor_open(db, txn, &cursor))
0161     {
0162         NDRX_LOG(log_error, "Failed to open cursor");
0163         EXFAIL_OUT(ret);
0164     }
0165     cursor_open = EXTRUE;
0166     ndrx_utc_tstamp2(&t, &tusec);
0167     
0168     /* loop over the db and match records  */
0169     
0170     /* Do not process duplicate record expiries, only new rec only... 
0171      * as killing new rec, old record will be killed too
0172      */
0173     op = EDB_FIRST;
0174     do
0175     {
0176         if (defer_free)
0177         {
0178             NDRX_FREE(defer_free);
0179             defer_free = NULL;
0180         }
0181         
0182         if (EXSUCCEED!=(ret = ndrx_cache_edb_cursor_getfullkey(db, cursor, 
0183                 &keydb, &val, op, &align)))
0184         {
0185             if (EDB_NOTFOUND==ret)
0186             {
0187                 /* this is ok */
0188                 ret = EXSUCCEED;
0189                 break;
0190             }
0191             else
0192             {
0193                 NDRX_LOG(log_error, "Failed to loop over the [%s] db", db->cachedb);
0194                 break;
0195             }
0196         }
0197         
0198         if (align)
0199         {
0200             defer_free = val.mv_data;
0201         }
0202         
0203         /* test is last symbols EOS of data, if not this might cause core dump! */
0204         
0205         NDRX_CACHE_CHECK_DBKEY((&keydb), TPMINVAL);
0206         
0207         pdata = (ndrx_tpcache_data_t *)val.mv_data;
0208         
0209         NDRX_CACHE_CHECK_DBDATA((&val), pdata, keydb.mv_data, TPMINVAL);
0210         
0211         /* we have timestamp for putting record in DB, but we need to calculate
0212          * the difference between 
0213          */
0214         
0215         NDRX_LOG(log_info, "TESTING: Record with key [%s]: current UTC: %ld, "
0216                     "record expiry %ld sec. Record expiry UTC: %ld (delta: %ld)", 
0217                     keydb.mv_data, t,  db->expiry, pdata->t + db->expiry,
0218                     (long)(t - pdata->t));
0219         
0220         if (EXEOS!=prev_key[0] && 0==strcmp(keydb.mv_data, prev_key))
0221         {
0222             NDRX_LOG(log_info, "Duplicate key - skip...");
0223             goto next;
0224         }
0225         else 
0226         {
0227             NDRX_STRCPY_SAFE(prev_key, keydb.mv_data);
0228         }
0229         
0230         /* so either record is expired or service does not exists */
0231         if (   ((db->flags & NDRX_TPCACHE_FLAGS_EXPIRY) && (pdata->t + db->expiry < t))
0232                 ||
0233                 ((db->flags & NDRX_TPCACHE_FLAGS_CLRNOSVC) && 
0234                     EXSUCCEED!=ndrx_shm_get_svc(pdata->svcnm, send_q, &tmp_is_bridge, NULL))
0235             )
0236         {
0237             NDRX_LOG(log_info, "Record with key [%s] expired: current UTC: %ld, "
0238                     "record expiry %ld sec. Record expiry UTC: %ld", 
0239                     keydb.mv_data, t,  db->expiry, pdata->t + db->expiry);
0240             
0241             /* copy is needed because key data might change during group delete */
0242             NDRX_STRCPY_SAFE(cur_key, keydb.mv_data);
0243 
0244             if (EXSUCCEED!=ndrx_tpcached_add_msg(&exp_list, &keydb, NULL))
0245             {
0246                 NDRX_LOG(log_debug, "Failed to add record to removal list!");
0247                 EXFAIL_OUT(ret);
0248             }
0249             
0250         }
0251 next:
0252         if (EDB_FIRST == op)
0253         {
0254             op = EDB_NEXT;
0255         }
0256         
0257     } while (EXSUCCEED==ret);
0258     
0259     /* RO tran abort, no need to commit! */
0260     cursor_open = EXFALSE;
0261     edb_cursor_close(cursor);
0262     ndrx_cache_edb_abort(db, txn);
0263     tran_started=EXFALSE;
0264     
0265     if (NULL!=exp_list)
0266     {
0267         if (0 > (deleted = ndrx_tpcached_kill_list(db, &exp_list)))
0268         {
0269             NDRX_LOG(log_debug, "Failed to remove expired records!");
0270             EXFAIL_OUT(ret);
0271         }
0272         NDRX_LOG(log_info, "Deleted %ld records", deleted);
0273     }
0274     else
0275     {
0276         NDRX_LOG(log_debug, "No records expired");
0277     }
0278 
0279 out:
0280      
0281     if (cursor_open)
0282     {
0283         edb_cursor_close(cursor);
0284     }
0285 
0286     if (tran_started)
0287     {
0288         ndrx_cache_edb_abort(db, txn);
0289     }
0290 
0291     /* free the list in case of failure */
0292     if (NULL!=exp_list)
0293     {
0294         ndrx_tpcached_free_list(&exp_list);
0295     }
0296 
0297     if (defer_free)
0298     {
0299         NDRX_FREE(defer_free);
0300     }
0301 
0302     return ret;
0303 }
0304 
0305 /**
0306  * Compare in reverse 
0307  * @param a
0308  * @param b
0309  * @return 
0310  */
0311 exprivate int cmpfunc_lru (const void * a, const void * b)
0312 {
0313     
0314     ndrx_tpcache_datasort_t **ad = (ndrx_tpcache_datasort_t **)a;
0315     ndrx_tpcache_datasort_t **bd = (ndrx_tpcache_datasort_t **)b;
0316     
0317     /* if records are empty (not filled, malloc'd then numbers will be higher anyway */
0318     
0319     /* to get newer rec first, we change the compare order */
0320     
0321     return ndrx_utc_cmp(&(*bd)->data.hit_t, &(*bd)->data.hit_tusec, 
0322             &(*ad)->data.hit_t, &(*ad)->data.hit_tusec);
0323     
0324 }
0325 /**
0326  * Sort by hits
0327  * @param a
0328  * @param b
0329  * @return 
0330  */
0331 exprivate int cmpfunc_hits (const void * a, const void * b) 
0332 {
0333     ndrx_tpcache_datasort_t **ad = (ndrx_tpcache_datasort_t **)a;
0334     ndrx_tpcache_datasort_t **bd = (ndrx_tpcache_datasort_t **)b;
0335 
0336     long res = (*bd)->data.hits - (*ad)->data.hits;
0337  
0338     /* do some custom work because of int */
0339     if (res < 0)
0340     {
0341         return -1;
0342     }
0343     else if (res > 0)
0344     {
0345         return 1;
0346     }
0347     
0348     return 0;
0349 }
0350 
0351 /**
0352  * Fifo order by date
0353  * @param a
0354  * @param b
0355  * @return 
0356  */
0357 exprivate int cmpfunc_fifo (const void * a, const void * b) 
0358 {
0359     
0360     ndrx_tpcache_datasort_t **ad = (ndrx_tpcache_datasort_t **)a;
0361     ndrx_tpcache_datasort_t **bd = (ndrx_tpcache_datasort_t **)b;
0362     
0363     /* if records are empty (not filled, malloc'd then numbers will be higher anyway */
0364     
0365     /* to get newer rec first, we change the compare order */
0366     
0367     return ndrx_utc_cmp(&(*bd)->data.t, &(*bd)->data.tusec, 
0368             &(*ad)->data.t, &(*ad)->data.tusec);
0369 }
0370 
0371 /**
0372  * Process single db - by limit rule    
0373  * @param db
0374  * @return EXSUCCEED/EXFAIL
0375  */
0376 exprivate int proc_db_limit(ndrx_tpcache_db_t *db)
0377 {
0378     int ret = EXSUCCEED;
0379     EDB_stat stat;
0380     EDB_txn *txn = NULL;
0381     int tran_started = EXFALSE;
0382     int cursor_open = EXFALSE;
0383     ndrx_tpcache_datasort_t **dsort = NULL;
0384     EDB_cursor_op op;
0385     EDB_val keydb, val;
0386     EDB_cursor *cursor;
0387     ndrx_tpcache_data_t *pdata;
0388     long i;
0389     long nodeid = tpgetnodeid();
0390     long deleted=0, dupsdel=0;
0391     ndrx_tpcached_msglist_t * dup_list = NULL;
0392     int align;
0393     char *defer_free = NULL;
0394     
0395     NDRX_LOG(log_debug, "%s enter dbname=[%s]", __func__, db->cachedb);
0396     /* Get size of db */
0397     
0398     /* start transaction */
0399     if (EXSUCCEED!=ndrx_cache_edb_begin(db, &txn, EDB_RDONLY))
0400     {
0401         NDRX_LOG(log_error, "Failed start transaction: %s", 
0402                 tpstrerror(tperrno));
0403         EXFAIL_OUT(ret);
0404     }
0405     
0406     tran_started = EXTRUE;
0407     
0408     
0409     if (EXSUCCEED!=ndrx_cache_edb_stat (db, txn, &stat))
0410     {
0411         NDRX_LOG(log_error, "Failed to get db statistics: %s", 
0412                 tpstrerror(tperrno));
0413         EXFAIL_OUT(ret);
0414     }
0415     
0416     NDRX_LOG(log_debug, "number of keys in db: %ld, limit: %d", 
0417             stat.ms_entries, db->limit);
0418     
0419     if (stat.ms_entries <= db->limit)
0420     {
0421         NDRX_LOG(6, "Under the limit -> no need to delete recs..");
0422         goto out;
0423     }
0424     
0425     /* allocate ptr array of number elements in db */
0426     
0427     /* open cursor firstly... */
0428     if (EXSUCCEED!=ndrx_cache_edb_cursor_open(db, txn, &cursor))
0429     {
0430         NDRX_LOG(log_error, "Failed to open cursor!");
0431         EXFAIL_OUT(ret);
0432     }
0433     cursor_open = EXTRUE;
0434     /* I guess after cursor open entries shall not grow? */
0435     NDRX_CALLOC_OUT(dsort, stat.ms_entries, sizeof(ndrx_tpcache_datasort_t*), 
0436             void);
0437     
0438     for (i=0; i<stat.ms_entries; i++)
0439     {
0440         NDRX_CALLOC_OUT(dsort[i], 1, sizeof(ndrx_tpcache_datasort_t), ndrx_tpcache_datasort_t);
0441         
0442         NDRX_LOG(log_error, "Alloc dsort %ld = %p", i, dsort[i]);
0443     }
0444     
0445     
0446     /* transfer all keys to array (allocate each cell), also got to copy key data/strdup.. */
0447     op = EDB_FIRST;
0448     i = 0;
0449     do
0450     {
0451         if (NULL!=defer_free)
0452         {
0453             NDRX_FREE(defer_free);
0454             defer_free = NULL;
0455         }
0456         
0457         NDRX_LOG(log_debug, "%s: [%s] db loop %ld, entries: %ld",  
0458                         __func__, db->cachedb, i, (long) stat.ms_entries);
0459         
0460         if (EXSUCCEED!=(ret = ndrx_cache_edb_cursor_getfullkey(db, cursor, 
0461                 &keydb, &val, op, &align)))
0462         {
0463             if (EDB_NOTFOUND==ret)
0464             {
0465                 /* this is ok */
0466                 ret = EXSUCCEED;
0467                 break;
0468             }
0469             else
0470             {
0471                 NDRX_LOG(log_error, "Failed to loop over the [%s] db", db->cachedb);
0472                 break;
0473             }
0474         }
0475         
0476         if (align)
0477         {
0478             defer_free = val.mv_data;
0479         }
0480         /* test is last symbols EOS of data, if not this might cause core dump! */
0481         
0482         NDRX_CACHE_CHECK_DBKEY((&keydb), TPMINVAL);
0483         
0484         pdata = (ndrx_tpcache_data_t *)val.mv_data;
0485         
0486         NDRX_CACHE_CHECK_DBDATA((&val), pdata, keydb.mv_data, TPMINVAL);
0487 
0488         
0489         /* check isn't duplicate records in DB? 
0490          * Well we need a test case here... to see how lmdb will act in this
0491          * case will it return only first sorted? Or return all keys in random
0492          * order?
0493          * LMDB sorts ok. no problem where. As first comes the highest order
0494          * entry (higher timestamp), thus second we remove...
0495          */
0496         if ((i>0 && 0!=strcmp(dsort[i-1]->key.mv_data, keydb.mv_data)) || 0==i)
0497         {
0498             /* populate array */
0499             
0500             
0501             NDRX_LOG(log_error, "Adding to qsort... [%ld]", i);
0502             
0503             /* just copy header, not full data...  */
0504             memcpy(&(dsort[i]->data), pdata, sizeof(ndrx_tpcache_data_t));
0505             /*
0506                     
0507             NDRX_DUMP(log_error, "Data", &(dsort[i]->data), sizeof(ndrx_tpcache_data_t));
0508             
0509 */
0510             /* duplicate key data (it is string!) */
0511 
0512             if (NULL==(dsort[i]->key.mv_data = NDRX_STRDUP(keydb.mv_data)))
0513             {
0514                 NDRX_LOG(log_error, "Failed to strdup: %s", keydb.mv_data);
0515                 EXFAIL_OUT(ret);
0516             }
0517 
0518             dsort[i]->key.mv_size = strlen(dsort[i]->key.mv_data)+1;
0519         } 
0520         else 
0521         {
0522             /* this is duplicate record, we will help the system and clean it up */
0523             NDRX_LOG(log_debug, "Removing duplicate: [%s] (mark for del)", 
0524                     keydb.mv_data);
0525             if (EXSUCCEED!=ndrx_tpcached_add_msg(&dup_list, &keydb, &val))
0526             {
0527                 NDRX_LOG(log_debug, "Failed to add record to removal list!");
0528                 EXFAIL_OUT(ret);
0529             }
0530         }
0531         
0532         if (EDB_FIRST == op)
0533         {
0534             op = EDB_NEXT;
0535         }
0536         
0537         i++;
0538         
0539         if (i>=stat.ms_entries)
0540         {
0541             /* TODO: test case when new records appears while one is performing cursor op... */
0542             NDRX_LOG(log_debug, "soft EOF");
0543             break;
0544         }
0545         
0546     } while (EXSUCCEED==ret);
0547     
0548     /* RO tran abort... */
0549     edb_cursor_close(cursor);
0550     cursor_open = EXFALSE;
0551     ndrx_cache_edb_abort(db, txn);
0552     tran_started=EXFALSE;
0553     
0554     
0555     /* sort array to according techniques:
0556      * lru, hits, fifo (tstamp based) */
0557     
0558     if (db->flags & NDRX_TPCACHE_FLAGS_LRU)
0559     {
0560         qsort(dsort, stat.ms_entries, sizeof(ndrx_tpcache_datasort_t*),
0561                   cmpfunc_lru);
0562     }
0563     else if (db->flags & NDRX_TPCACHE_FLAGS_HITS)
0564     {
0565         qsort(dsort, stat.ms_entries, sizeof(ndrx_tpcache_datasort_t*),
0566                   cmpfunc_hits);
0567     }
0568     else if (db->flags & NDRX_TPCACHE_FLAGS_FIFO)
0569     {
0570         qsort(dsort, stat.ms_entries, sizeof(ndrx_tpcache_datasort_t*),
0571                   cmpfunc_fifo);
0572     }
0573     else
0574     {
0575         NDRX_LOG(log_error, "Invalid db flags: %ld", db->flags);
0576         EXFAIL_OUT(ret);
0577     }    
0578     
0579     if (db->limit >= stat.ms_entries)
0580     {
0581         NDRX_LOG(log_debug, "Nothing to delete");
0582         goto out;
0583     }
0584     
0585     /* empty lists are always at the end of the array */
0586     /* then go over the linear array, and remove records which goes over the cache */
0587     /* just print the sorted arrays... */
0588     NDRX_LOG(log_debug, "%s: starting RW tran", __func__);
0589     
0590     if (EXSUCCEED!=ndrx_cache_edb_begin(db, &txn, 0))
0591     {
0592         NDRX_LOG(log_error, "Failed start transaction: %s", 
0593                 tpstrerror(tperrno));
0594         EXFAIL_OUT(ret);
0595     }
0596     tran_started = EXTRUE;
0597     
0598     for (i=db->limit; i<stat.ms_entries; i++)
0599     {
0600         char *p = dsort[i]->key.mv_data;
0601         
0602         
0603         NDRX_LOG(log_debug, "Cache infos: key [%s], last used: %ld.%ld", 
0604                 dsort[i]->key.mv_data, dsort[i]->data.hit_t, dsort[i]->data.hit_tusec);
0605         
0606         
0607         if (EXEOS!=p[0])
0608         {
0609             /* this is ok entry, lets remove it! */        
0610             
0611             NDRX_LOG(log_info, "About to delete: key=[%s]", dsort[i]->key.mv_data);
0612             
0613             if (EXSUCCEED!=ndrx_cache_inval_by_key(db->cachedb, db, 
0614                     dsort[i]->key.mv_data, (short)nodeid, txn, EXTRUE))
0615             {
0616                 NDRX_LOG(log_debug, "Failed to delete record by key [%s]", 
0617                         dsort[i]->key.mv_data);
0618                 EXFAIL_OUT(ret);
0619             }
0620             /* use existing delete func... */
0621             deleted++;
0622         }
0623     }
0624     
0625     NDRX_LOG(log_info, "Deleted %ld records, %ld duplicates del", deleted, dupsdel);
0626 
0627 out:
0628     
0629     if (cursor_open)
0630     {
0631         edb_cursor_close(cursor);
0632     }
0633 
0634     if (tran_started)
0635     {
0636         if (EXSUCCEED==ret)
0637         {
0638             if (EXSUCCEED!=ndrx_cache_edb_commit(db, txn))
0639             {
0640                 NDRX_CACHE_TPERROR(TPESYSTEM, "%s: Failed to commit: %s", 
0641                     __func__, tpstrerror(tperrno));
0642                 ndrx_cache_edb_abort(db, txn);
0643             }
0644         }
0645         else
0646         {
0647             ndrx_cache_edb_abort(db, txn);
0648         }
0649     }
0650 
0651     if (NULL!=dup_list)
0652     {
0653         if (EXSUCCEED==ret)
0654         {
0655             if (0 > (deleted = ndrx_tpcached_kill_list(db, &dup_list)))
0656             {
0657                 NDRX_LOG(log_debug, "Failed to remove duplicate records!");
0658                 ret=EXFAIL;
0659             }
0660             NDRX_LOG(log_info, "Deleted %ld records", deleted);
0661         }
0662     }
0663 
0664     if (NULL!=dup_list)
0665     {
0666         ndrx_tpcached_free_list(&dup_list);
0667     }
0668 
0669     /* kill the list -> free some memory! */
0670     if (NULL!=dsort)
0671     {
0672         for (i=0; i<stat.ms_entries; i++)
0673         {
0674             NDRX_FREE(dsort[i]->key.mv_data);
0675             NDRX_FREE(dsort[i]);
0676         }
0677         
0678         NDRX_FREE(dsort);
0679         dsort = NULL;
0680     }
0681 
0682     if (NULL!=defer_free)
0683     {
0684         NDRX_FREE(defer_free);
0685     }
0686 
0687 
0688     return ret;
0689 }
0690 
0691 /**
0692  * Scan for duplicates and remove them. This could be useful for services
0693  * for which there are lot of caching, but less later accessing. Thus have
0694  * some housekeeping
0695  * @param db
0696  * @return EXSUCCEED/EXFAIL
0697  */
0698 exprivate int proc_db_dups(ndrx_tpcache_db_t *db)
0699 {
0700     int ret = EXSUCCEED;
0701     EDB_txn *txn = NULL;
0702     int tran_started = EXFALSE;
0703     int cursor_open = EXFALSE;
0704     EDB_cursor_op op;
0705     EDB_val keydb, val;
0706     EDB_cursor *cursor = NULL;
0707     long deleted=0;
0708     long i;
0709     ndrx_tpcache_data_t *pdata;
0710     char *prvkey = NULL;
0711     ndrx_tpcached_msglist_t * dup_list = NULL;
0712     int align;
0713     char *defer_free = NULL;
0714     
0715     NDRX_LOG(log_debug, "%s enter dbname=[%s]", __func__, db->cachedb);
0716     /* Get size of db */
0717     
0718     /* start transaction */
0719     if (EXSUCCEED!=ndrx_cache_edb_begin(db, &txn, EDB_RDONLY))
0720     {
0721         NDRX_LOG(log_error, "Failed start transaction: %s", 
0722                 tpstrerror(tperrno));
0723         EXFAIL_OUT(ret);
0724     }
0725     
0726     tran_started = EXTRUE;
0727     
0728     /* open cursor firstly... */
0729     if (EXSUCCEED!=ndrx_cache_edb_cursor_open(db, txn, &cursor))
0730     {
0731         NDRX_LOG(log_error, "Failed to open cursor!");
0732         EXFAIL_OUT(ret);
0733     }
0734     cursor_open = EXTRUE;
0735     
0736     /* transfer all keys to array (allocate each cell), also got to copy key data/strdup.. */
0737     op = EDB_FIRST;
0738     i = 0;
0739     do
0740     {
0741         if (defer_free)
0742         {
0743             NDRX_FREE(defer_free);
0744             defer_free = NULL;
0745         }
0746         
0747         if (EXSUCCEED!=(ret = ndrx_cache_edb_cursor_getfullkey(db, cursor, 
0748                 &keydb, &val, op, &align)))
0749         {
0750             if (EDB_NOTFOUND==ret)
0751             {
0752                 /* this is ok */
0753                 ret = EXSUCCEED;
0754                 break;
0755             }
0756             else
0757             {
0758                 NDRX_LOG(log_error, "Failed to loop over the [%s] db", db->cachedb);
0759                 break;
0760             }
0761         }
0762         
0763         if (align)
0764         {
0765             defer_free = val.mv_data;
0766         }
0767         /* test is last symbols EOS of data, if not this might cause core dump! */
0768         
0769         NDRX_CACHE_CHECK_DBKEY((&keydb), TPMINVAL);
0770         
0771         pdata = (ndrx_tpcache_data_t *)val.mv_data;
0772         
0773         NDRX_CACHE_CHECK_DBDATA((&val), pdata, keydb.mv_data, TPMINVAL);
0774         
0775         /* store prev key */
0776         if (i!=0)
0777         {
0778             /* this is duplicate record, we will help the system and clean it up */
0779             
0780             if (0==strcmp(prvkey, (char *)keydb.mv_data))
0781             {
0782                 NDRX_LOG(log_debug, "Removing duplicate: [%s] (mark for removal)", 
0783                         keydb.mv_data);
0784                 if (EXSUCCEED!=ndrx_tpcached_add_msg(&dup_list, &keydb, &val))
0785                 {
0786                     NDRX_LOG(log_debug, "Failed to add record to removal list!");
0787                     EXFAIL_OUT(ret);
0788                 }
0789             }
0790             else
0791             {
0792                 NDRX_FREE(prvkey);
0793             }
0794         }
0795         
0796         if (NULL==(prvkey = NDRX_STRDUP(keydb.mv_data)))
0797         {
0798             int err = errno;
0799             NDRX_LOG(log_error, "Failed to strdup: %s", strerror(err));
0800             userlog("Failed to strdup: %s", strerror(err));
0801             EXFAIL_OUT(ret);
0802         }
0803         
0804         if (EDB_FIRST == op)
0805         {
0806             op = EDB_NEXT;
0807         }
0808         
0809         i++;
0810         
0811     } while (EXSUCCEED==ret);
0812     
0813     /* RD only abort */
0814     edb_cursor_close(cursor);
0815     cursor_open = EXFALSE;
0816     ndrx_cache_edb_abort(db, txn);
0817     tran_started=EXFALSE;
0818     
0819     
0820     if (NULL!=dup_list)
0821     {
0822         if (0 > (deleted = ndrx_tpcached_kill_list(db, &dup_list)))
0823         {
0824             NDRX_LOG(log_debug, "Failed to remove duplicate records!");
0825             EXFAIL_OUT(ret);
0826         }
0827         NDRX_LOG(log_info, "Deleted %ld records", deleted);
0828     }
0829     else
0830     {
0831         NDRX_LOG(log_debug, "No duplicate expired");
0832     }
0833 
0834 out:
0835                 
0836     if (NULL!=prvkey)
0837     {
0838         NDRX_FREE(prvkey);
0839     }
0840 
0841     if (NULL!=dup_list)
0842     {
0843         ndrx_tpcached_free_list(&dup_list);
0844     }
0845 
0846     if (cursor_open)
0847     {
0848         edb_cursor_close(cursor);
0849     }
0850 
0851     /* rd only */
0852     if (tran_started)
0853     {
0854         ndrx_cache_edb_abort(db, txn);
0855     }
0856 
0857     if (defer_free)
0858     {
0859         NDRX_FREE(defer_free);
0860     }
0861 
0862     return ret;
0863 }
0864 
0865 /**
0866  * Check that we are pending some signals (specially for Apple...)
0867  * @return EXTRUE/EXFALSE
0868  */
0869 exprivate int is_shutdown_pending(void)
0870 {
0871     sigset_t pending;
0872     sigpending(&pending);
0873     if (sigismember(&pending, SIGINT) ||
0874         sigismember(&pending, SIGTERM))
0875     {
0876         return EXTRUE;
0877     }
0878     
0879     return EXFALSE;
0880 }
0881 
0882 /**
0883  * Main entry point for `tpcached' utility
0884  */
0885 expublic int main(int argc, char** argv)
0886 {
0887 
0888     int ret=EXSUCCEED;
0889 
0890     struct timespec timeout;
0891     siginfo_t info;
0892     int result = 0;
0893     int i;
0894     ndrx_tpcache_db_t *dbh, *el, *elt;
0895 
0896     /* local init */
0897     
0898     if (EXSUCCEED!=init(argc, argv))
0899     {
0900         NDRX_LOG(log_error, "Failed to init!");
0901         EXFAIL_OUT(ret);
0902     }
0903     
0904     /* ATMI init */
0905     
0906     if (EXSUCCEED!=tpinit(NULL))
0907     {
0908         NDRX_LOG(log_error, "Failed to init: %s", tpstrerror(tperrno));
0909         EXFAIL_OUT(ret);
0910     }
0911     
0912     /* 
0913      * loop over all databases
0914      * if database is limited (i.e. limit > 0), then do following:
0915      * - Read keys or (header with out data) into memory (linear mem)
0916      * and perform corresponding qsort
0917      * then remove records which we have at tail of the array.
0918      * - sleep configured time
0919      */
0920     
0921     timeout.tv_sec = M_sleep;
0922     timeout.tv_nsec = 0;
0923 
0924 
0925     while (!M_shutdown)
0926     {
0927         /* wait for signal or timeout... */
0928 #if EX_OS_DARWIN
0929         for (i=0; i<M_sleep; i++)
0930         {
0931             if (is_shutdown_pending())
0932             {
0933                 NDRX_LOG(log_debug, "Shutdown requested by signal...");
0934                 M_shutdown = EXTRUE;
0935                 break;
0936             }
0937             else
0938             {
0939                 sleep(1);
0940             }
0941         }
0942         if (M_shutdown)
0943         {
0944             break;
0945         }
0946 #else
0947         result = sigtimedwait( &M_mask, &info, &timeout );
0948 
0949         if (result > 0)
0950         {
0951             if (SIGINT == result || SIGTERM == result)
0952             {
0953                 NDRX_LOG(log_warn, "Signal received: %d - shutting down", result);
0954                 M_shutdown = EXTRUE;
0955                 break;
0956             }
0957             else
0958             {
0959                 NDRX_LOG(log_warn, "Signal received: %d - ignore", result);
0960             }
0961         }
0962         else if (EXFAIL==result)
0963         {
0964             int err = errno;
0965             
0966             if (EAGAIN!=err)
0967             {
0968                 NDRX_LOG(log_error, "sigtimedwait failed: %s", strerror(err));
0969                 EXFAIL_OUT(ret);
0970             }
0971         }
0972 #endif
0973         NDRX_LOG(log_debug, "Scanning...");
0974         /* Get the DBs */
0975         /* interval process */
0976         dbh = ndrx_cache_dbgethash();
0977 
0978         EXHASH_ITER(hh, dbh, el, elt)
0979         {
0980             /* process db */
0981             if ( (el->flags & NDRX_TPCACHE_FLAGS_EXPIRY)
0982                     ||
0983                     (el->flags & NDRX_TPCACHE_FLAGS_CLRNOSVC))
0984             {
0985                 if (EXSUCCEED!=proc_db_expiry_nosvc(el))
0986                 {
0987                    NDRX_LOG(log_error, "Failed to process expiry cache: [%s]", 
0988                            el->cachedb);
0989                    EXFAIL_OUT(ret);
0990                 }
0991             }
0992             
0993             /* Allow expiry messages to be space limited too */
0994             if (
0995                     el->flags & NDRX_TPCACHE_FLAGS_LRU ||
0996                     el->flags & NDRX_TPCACHE_FLAGS_HITS ||
0997                     el->flags & NDRX_TPCACHE_FLAGS_FIFO
0998                  ) 
0999             {
1000                 if (EXSUCCEED!=proc_db_limit(el))
1001                 {
1002                    NDRX_LOG(log_error, "Failed to process limit cache: [%s]", 
1003                            el->cachedb);
1004                    EXFAIL_OUT(ret);
1005                 }
1006             }
1007             
1008             /* And we might search for duplicates in cluster configuration */
1009             if (el->flags & NDRX_TPCACHE_FLAGS_SCANDUP)
1010             {
1011                NDRX_LOG(log_error, "scanning for duplicates");
1012                
1013                 if (EXSUCCEED!=proc_db_dups(el))
1014                 {
1015                    NDRX_LOG(log_error, "Failed to process limit cache: [%s]", 
1016                            el->cachedb);
1017                    EXFAIL_OUT(ret);
1018                 }
1019             }
1020         }
1021     }
1022     
1023 out:
1024     /* un-initialize */
1025     tpterm();
1026     return ret;
1027 }
1028 
1029 /* vim: set ts=4 sw=4 et smartindent: */