0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 #include <dirent.h>
0043 #include <pthread.h>
0044 #include <signal.h>
0045
0046 #include <ndebug.h>
0047 #include <atmi.h>
0048 #include <atmi_int.h>
0049 #include <typed_buf.h>
0050 #include <ndrstandard.h>
0051 #include <ubf.h>
0052 #include <Exfields.h>
0053 #include <tperror.h>
0054 #include <exnet.h>
0055 #include <ndrxdcmn.h>
0056
0057 #include "tmsrv.h"
0058 #include "../libatmisrv/srv_int.h"
0059 #include <xa_cmn.h>
0060 #include <atmi_int.h>
0061 #include <ndrxdiag.h>
0062
0063
0064
0065
0066
0067 expublic pthread_t G_bacground_thread;
0068 expublic int G_bacground_req_shutdown = EXFALSE;
0069
0070 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0071 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0072
0073 exprivate MUTEX_LOCKDECL(M_background_lock);
0074
0075 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;
0076
0077
0078 exprivate ndrx_tms_file_registry_t *M_broken_tmxids=NULL;
0079
0080
0081 exprivate ndrx_tms_file_registry_t *M_attempts_tmxids=NULL;
0082
0083
0084
0085 exprivate int background_load_attempts(void);
0086
0087
0088
0089
0090
0091
0092
0093 expublic ndrx_tms_file_registry_t *ndrx_tms_file_registry_get(ndrx_tms_file_registry_t ** hash,
0094 const char *tmxid)
0095 {
0096 ndrx_tms_file_registry_t *p_ret = NULL;
0097
0098 EXHASH_FIND_STR((*hash), tmxid, p_ret);
0099
0100 return p_ret;
0101 }
0102
0103
0104
0105
0106
0107
0108
0109
0110 expublic int ndrx_tms_file_registry_add(ndrx_tms_file_registry_t ** hash,
0111 const char *tmxid, int housekeepable)
0112 {
0113 int ret = EXSUCCEED;
0114 ndrx_tms_file_registry_t *p_ret = NULL;
0115
0116 p_ret = ndrx_tms_file_registry_get(hash, tmxid);
0117
0118 if (NULL==p_ret)
0119 {
0120 p_ret = (ndrx_tms_file_registry_t *)NDRX_FPMALLOC(sizeof(ndrx_tms_file_registry_t), 0);
0121
0122 if (NULL==p_ret)
0123 {
0124 NDRX_LOG(log_error, "Failed to allocate memory for tmxid: [%s] monitoring",
0125 tmxid);
0126 EXFAIL_OUT(ret);
0127 }
0128
0129 NDRX_STRCPY_SAFE(p_ret->tmxid, tmxid);
0130 p_ret->housekeepable=housekeepable;
0131 p_ret->attempts=0;
0132 EXHASH_ADD_STR(*hash, tmxid, p_ret);
0133 }
0134 out:
0135 return ret;
0136 }
0137
0138 expublic int ndrx_tms_file_registry_del(ndrx_tms_file_registry_t ** hash, ndrx_tms_file_registry_t *ent)
0139 {
0140 int ret = EXSUCCEED;
0141
0142 EXHASH_DEL(*hash, ent);
0143 NDRX_FPFREE(ent);
0144
0145 return ret;
0146 }
0147
0148
0149
0150
0151 expublic void ndrx_tms_file_registry_free(ndrx_tms_file_registry_t ** hash)
0152 {
0153 ndrx_tms_file_registry_t *el, *tmp;
0154
0155 EXHASH_ITER(hh, (*hash), el, tmp)
0156 {
0157 ndrx_tms_file_registry_del(hash, el);
0158 }
0159 }
0160
0161
0162
0163
0164 expublic void background_lock(void)
0165 {
0166 MUTEX_LOCK_V(M_background_lock);
0167 }
0168
0169
0170
0171
0172 expublic void background_unlock(void)
0173 {
0174 MUTEX_UNLOCK_V(M_background_lock);
0175 }
0176
0177
0178
0179
0180
0181
0182
0183
0184 expublic int background_read_log(void)
0185 {
0186 int ret=EXSUCCEED;
0187 struct dirent **namelist = NULL;
0188 int n, cnt;
0189 int len;
0190 char tranmask[256];
0191 char fnamefull[PATH_MAX+1];
0192 atmi_xa_log_t *pp_tl = NULL;
0193
0194 snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid,
0195 G_atmi_env.xa_rmid, G_server_conf.srv_id);
0196 len = strlen(tranmask);
0197
0198 cnt = scandir(G_tmsrv_cfg.tlog_dir, &namelist, 0, alphasort);
0199 if (cnt < 0)
0200 {
0201 NDRX_LOG(log_error, "Failed to scan [%s]: %s",
0202 G_tmsrv_cfg.tlog_dir, strerror(errno));
0203 ret=EXFAIL;
0204 goto out;
0205 }
0206 else
0207 {
0208 for (n=0; n<cnt; n++)
0209 {
0210 if (0==strcmp(namelist[n]->d_name, ".") ||
0211 0==strcmp(namelist[n]->d_name, ".."))
0212 {
0213
0214 NDRX_FREE(namelist[n]);
0215 continue;
0216 }
0217
0218
0219 if (0==strncmp(namelist[n]->d_name, tranmask, len))
0220 {
0221 snprintf(fnamefull, sizeof(fnamefull), "%s/%s", G_tmsrv_cfg.tlog_dir,
0222 namelist[n]->d_name);
0223
0224 NDRX_LOG(log_warn, "Resuming transaction: [%s] (enqueue)",
0225 fnamefull);
0226
0227 if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_attempts_tmxids,
0228 namelist[n]->d_name+len, EXFALSE))
0229 {
0230 NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to registry (malloc err?)",
0231 namelist[n]->d_name+len);
0232 EXFAIL_OUT(ret);
0233 }
0234 }
0235
0236 NDRX_FREE(namelist[n]);
0237 }
0238
0239 NDRX_FREE(namelist);
0240 namelist = NULL;
0241 }
0242
0243
0244
0245
0246
0247 if (EXSUCCEED!=background_load_attempts())
0248 {
0249 EXFAIL_OUT(ret);
0250 }
0251
0252 out:
0253 if (NULL!=namelist)
0254 {
0255 NDRX_FREE(namelist);
0256 }
0257 return ret;
0258 }
0259
0260
0261
0262
0263
0264 exprivate void thread_sleep(int sleep_sec)
0265 {
0266 struct timespec wait_time;
0267 struct timeval now;
0268 int rt;
0269
0270 gettimeofday(&now,NULL);
0271
0272 wait_time.tv_sec = now.tv_sec+sleep_sec;
0273 wait_time.tv_nsec = now.tv_usec*1000;
0274
0275 MUTEX_LOCK_V(M_wait_mutex);
0276 rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0277 MUTEX_UNLOCK_V(M_wait_mutex);
0278 }
0279
0280
0281
0282
0283 expublic void background_wakeup(void)
0284 {
0285 MUTEX_LOCK_V(M_wait_mutex);
0286 pthread_cond_signal(&M_wait_cond);
0287 MUTEX_UNLOCK_V(M_wait_mutex);
0288 }
0289
0290
0291
0292
0293
0294 expublic int background_chkdisk(void)
0295 {
0296 char tmp[PATH_MAX+1];
0297 int i;
0298 int ret=EXSUCCEED;
0299 DIR *dir=NULL;
0300 struct dirent *entry;
0301 int len;
0302 char tranmask[256];
0303 atmi_xa_log_t *pp_tl = NULL;
0304 char *p_name;
0305
0306 snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid,
0307 G_atmi_env.xa_rmid, G_server_conf.srv_id);
0308 len = strlen(tranmask);
0309
0310 dir = opendir(G_tmsrv_cfg.tlog_dir);
0311
0312 if (dir == NULL)
0313 {
0314
0315 NDRX_LOG(log_error, "opendir [%s] failed: %s",
0316 G_tmsrv_cfg.tlog_dir, strerror(errno));
0317 EXFAIL_OUT(ret);
0318 }
0319
0320 while ((entry = readdir(dir)) != NULL)
0321 {
0322 if (0==strncmp(entry->d_name, tranmask, len))
0323 {
0324
0325 p_name = entry->d_name+len;
0326 NDRX_STRCPY_SAFE(tmp, p_name);
0327
0328 if (!tms_log_exists_entry(tmp))
0329 {
0330 ndrx_tms_file_registry_t *p_reg = NULL;
0331
0332 snprintf(tmp, sizeof(tmp), "%s/%s", G_tmsrv_cfg.tlog_dir,
0333 entry->d_name);
0334
0335 if (ndrx_file_exists(tmp))
0336 {
0337 if (NULL!=(p_reg=ndrx_tms_file_registry_get(&M_broken_tmxids, p_name)))
0338 {
0339 if (tms_housekeep(tmp))
0340 {
0341
0342 ndrx_tms_file_registry_del(&M_broken_tmxids, p_reg);
0343 }
0344 }
0345 else if (NULL==ndrx_tms_file_registry_get(&M_attempts_tmxids, p_name))
0346 {
0347
0348 if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_attempts_tmxids, p_name,
0349 EXFALSE))
0350 {
0351 NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to "
0352 "registry (malloc err?)", p_name);
0353 EXFAIL_OUT(ret);
0354 }
0355 }
0356 }
0357 }
0358 }
0359 }
0360
0361 out:
0362 if (NULL!=dir)
0363 {
0364 closedir(dir);
0365 }
0366
0367 return ret;
0368
0369 }
0370
0371
0372
0373
0374
0375 exprivate int background_load_attempts(void)
0376 {
0377 int ret = EXSUCCEED;
0378 ndrx_tms_file_registry_t *el, *tmp;
0379 int is_tout;
0380 atmi_xa_log_t *p_tl;
0381 atmi_xa_tx_info_t xai;
0382 char filename[PATH_MAX+1];
0383 int log_removed, housekeepable;
0384
0385 memset(&xai, 0, sizeof(xai));
0386
0387 EXHASH_ITER(hh, M_attempts_tmxids, el, tmp)
0388 {
0389 snprintf(filename, sizeof(filename), "%s/TRN-%ld-%hd-%d-%s", G_tmsrv_cfg.tlog_dir,
0390 G_tmsrv_cfg.vnodeid, G_atmi_env.xa_rmid, G_server_conf.srv_id, el->tmxid);
0391
0392
0393
0394
0395
0396
0397
0398
0399 if (0!=access(filename, 0) && ENOENT==errno)
0400 {
0401 NDRX_LOG(log_debug, "Transaction: [%s] does not exist anymore",
0402 el->tmxid);
0403 ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0404 continue;
0405 }
0406 else if (EXSUCCEED==tms_load_logfile(filename, el->tmxid, &p_tl,
0407 &log_removed, &housekeepable))
0408 {
0409
0410 NDRX_LOG(log_info, "Transaction: [%s] loaded successfully",
0411 el->tmxid);
0412 ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0413 }
0414 else if (log_removed)
0415 {
0416
0417 ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0418 }
0419 else
0420 {
0421 NDRX_LOG(log_error, "Failed to load transaction: [%s]",
0422 el->tmxid);
0423
0424 el->attempts++;
0425 if (el->attempts>=G_tmsrv_cfg.logparse_attempts)
0426 {
0427 NDRX_LOG(log_error, "Transaction: [%s] failed to load after %d tries "
0428 "- not loading any more",
0429 el->tmxid, el->attempts);
0430
0431
0432
0433
0434
0435 if (EXSUCCEED!=ndrx_tms_file_registry_add(&M_broken_tmxids,
0436 el->tmxid, housekeepable))
0437 {
0438 NDRX_LOG(log_error, "Failed to enqueue tmxid: [%s] to "
0439 "registry (malloc err?)", el->tmxid);
0440 EXFAIL_OUT(ret);
0441 }
0442
0443 ndrx_tms_file_registry_del(&M_attempts_tmxids, el);
0444 }
0445 }
0446 }
0447
0448 out:
0449 return ret;
0450 }
0451
0452
0453
0454
0455
0456
0457 expublic int background_loop(void)
0458 {
0459 int ret = EXSUCCEED;
0460 atmi_xa_log_list_t *tx_list;
0461 atmi_xa_log_list_t *el, *tmp;
0462 atmi_xa_tx_info_t xai;
0463 atmi_xa_log_t *p_tl;
0464
0465 memset(&xai, 0, sizeof(xai));
0466
0467 ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0468
0469 while(!G_bacground_req_shutdown)
0470 {
0471
0472 if (G_tmsrv_cfg.ping_time > 0)
0473 {
0474 tm_ping_db(NULL, NULL);
0475 }
0476
0477
0478
0479
0480
0481 if (G_tmsrv_cfg.chkdisk_time > 0 &&
0482 ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch) > G_tmsrv_cfg.chkdisk_time)
0483 {
0484
0485 background_chkdisk();
0486 ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0487 }
0488
0489
0490
0491
0492 background_lock();
0493 tx_list = tms_copy_hash2list(COPY_MODE_BACKGROUND | COPY_MODE_ACQLOCK);
0494
0495 LL_FOREACH_SAFE(tx_list,el,tmp)
0496 {
0497
0498 NDRX_LOG(log_info, "XID: [%s] stage: [%hd]. Try: %ld, max: %d.",
0499 el->p_tl.tmxid, el->p_tl.txstage, el->p_tl.trycount,
0500 G_tmsrv_cfg.max_tries);
0501
0502 if (el->p_tl.trycount>=G_tmsrv_cfg.max_tries)
0503 {
0504 NDRX_LOG(log_warn, "Skipping try %ld of %ld...",
0505 el->p_tl.trycount, G_tmsrv_cfg.max_tries);
0506
0507 LL_DELETE(tx_list, el);
0508 NDRX_FREE(el);
0509 continue;
0510 }
0511
0512
0513 if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0514 {
0515 p_tl->trycount++;
0516
0517 NDRX_LOG(log_info, "XID: [%s] try counter increased to: %d",
0518 el->p_tl.tmxid, p_tl->trycount);
0519
0520
0521 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
0522 {
0523 EXFAIL_OUT(ret);
0524 }
0525 XA_TX_COPY((&xai), p_tl);
0526
0527
0528
0529
0530 tm_drive(&xai, p_tl, XA_OP_COMMIT, EXFAIL, 0L);
0531 }
0532 else
0533 {
0534 NDRX_LOG(log_debug, "Transaction locked or already "
0535 "processed by foreground...");
0536 }
0537
0538 LL_DELETE(tx_list, el);
0539 NDRX_FREE(el);
0540 }
0541
0542 background_unlock();
0543 NDRX_LOG(log_debug, "background - sleep %d",
0544 G_tmsrv_cfg.scan_time);
0545
0546 if (!G_bacground_req_shutdown)
0547 thread_sleep(G_tmsrv_cfg.scan_time);
0548
0549
0550
0551
0552 background_load_attempts();
0553 }
0554
0555 out:
0556 return ret;
0557 }
0558
0559
0560
0561
0562
0563 expublic void * background_process(void *arg)
0564 {
0565 NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
0566
0567 tm_thread_init();
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579 background_loop();
0580
0581 tm_thread_uninit();
0582
0583 NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
0584
0585 return NULL;
0586 }
0587
0588
0589
0590
0591
0592 expublic int background_process_init(void)
0593 {
0594 int ret=EXSUCCEED;
0595 pthread_attr_t pthread_custom_attr;
0596
0597
0598
0599
0600
0601
0602
0603 if (EXSUCCEED!=background_read_log())
0604 {
0605 NDRX_LOG(log_error, "Failed to recover logs");
0606 userlog("Failed to recover logs");
0607 EXFAIL_OUT(ret);
0608 }
0609
0610 pthread_attr_init(&pthread_custom_attr);
0611
0612
0613
0614
0615 ndrx_platf_stack_set(&pthread_custom_attr);
0616 if (EXSUCCEED!=pthread_create(&G_bacground_thread, &pthread_custom_attr,
0617 background_process, NULL))
0618 {
0619 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "background_process_init");
0620 EXFAIL_OUT(ret);
0621 }
0622 out:
0623 return ret;
0624
0625 }
0626