0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <string.h>
0039 #include <errno.h>
0040 #include <regex.h>
0041 #include <utlist.h>
0042 #include <dirent.h>
0043 #include <pthread.h>
0044 #include <signal.h>
0045
0046 #include <ndebug.h>
0047 #include <atmi.h>
0048 #include <atmi_int.h>
0049 #include <typed_buf.h>
0050 #include <ndrstandard.h>
0051 #include <ubf.h>
0052 #include <Exfields.h>
0053 #include <tperror.h>
0054 #include <exnet.h>
0055 #include <ndrxdcmn.h>
0056
0057 #include "tmsrv.h"
0058 #include "../libatmisrv/srv_int.h"
0059 #include <xa_cmn.h>
0060 #include <atmi_int.h>
0061 #include <ndrxdiag.h>
0062
0063
0064 #define NDRX_TMS_FILE_STATE_INITIAL 0
0065 #define NDRX_TMS_FILE_STATE_IGNORE 1
0066
0067
0068
0069 expublic pthread_t G_bacground_thread;
0070 expublic int G_bacground_req_shutdown = EXFALSE;
0071
0072 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0073 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0074
0075 exprivate MUTEX_LOCKDECL(M_background_lock);
0076
0077 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;
0078
0079 exprivate ndrx_tms_file_registry_t *M_broken_tmxids=NULL;
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089 expublic ndrx_tms_file_registry_t *ndrx_tms_file_registry_get(const char *tmxid)
0090 {
0091 ndrx_tms_file_registry_t *p_ret = NULL;
0092
0093 EXHASH_FIND_STR(M_broken_tmxids, tmxid, p_ret);
0094
0095 return p_ret;
0096 }
0097
0098
0099
0100
0101
0102
0103
0104 expublic int ndrx_tms_file_registry_add(const char *tmxid, int state)
0105 {
0106 int ret = EXSUCCEED;
0107 ndrx_tms_file_registry_t *p_ret = NULL;
0108
0109 p_ret = ndrx_tms_file_registry_get(tmxid);
0110
0111 if (NULL==p_ret)
0112 {
0113 p_ret = (ndrx_tms_file_registry_t *)NDRX_FPMALLOC(sizeof(ndrx_tms_file_registry_t), 0);
0114
0115 if (NULL==p_ret)
0116 {
0117 NDRX_LOG(log_error, "Failed to allocate memory for tmxid: [%s] monitoring",
0118 tmxid);
0119 EXFAIL_OUT(ret);
0120 }
0121
0122 NDRX_STRCPY_SAFE(p_ret->tmxid, tmxid);
0123 p_ret->state = state;
0124
0125 EXHASH_ADD_STR(M_broken_tmxids, tmxid, p_ret);
0126 }
0127 out:
0128 return ret;
0129 }
0130
0131 expublic int ndrx_tms_file_registry_del(ndrx_tms_file_registry_t *ent)
0132 {
0133 int ret = EXSUCCEED;
0134
0135 EXHASH_DEL(M_broken_tmxids, ent);
0136 NDRX_FPFREE(ent);
0137
0138 return ret;
0139 }
0140
0141
0142
0143
0144 expublic void ndrx_tms_file_registry_free(void)
0145 {
0146 ndrx_tms_file_registry_t *el, *tmp;
0147
0148 EXHASH_ITER(hh, M_broken_tmxids, el, tmp)
0149 {
0150 ndrx_tms_file_registry_del(el);
0151 }
0152 }
0153
0154
0155
0156
0157 expublic void background_lock(void)
0158 {
0159 MUTEX_LOCK_V(M_background_lock);
0160 }
0161
0162
0163
0164
0165 expublic void background_unlock(void)
0166 {
0167 MUTEX_UNLOCK_V(M_background_lock);
0168 }
0169
0170
0171
0172
0173
0174
0175
0176
0177 expublic int background_read_log(void)
0178 {
0179 int ret=EXSUCCEED;
0180 struct dirent **namelist = NULL;
0181 int n, cnt;
0182 int len;
0183 char tranmask[256];
0184 char fnamefull[PATH_MAX+1];
0185 atmi_xa_log_t *pp_tl = NULL;
0186
0187 snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid,
0188 G_atmi_env.xa_rmid, G_server_conf.srv_id);
0189 len = strlen(tranmask);
0190
0191 cnt = scandir(G_tmsrv_cfg.tlog_dir, &namelist, 0, alphasort);
0192 if (cnt < 0)
0193 {
0194 NDRX_LOG(log_error, "Failed to scan [%s]: %s",
0195 G_tmsrv_cfg.tlog_dir, strerror(errno));
0196 ret=EXFAIL;
0197 goto out;
0198 }
0199 else
0200 {
0201 for (n=0; n<cnt; n++)
0202 {
0203 if (0==strcmp(namelist[n]->d_name, ".") ||
0204 0==strcmp(namelist[n]->d_name, ".."))
0205 {
0206
0207 NDRX_FREE(namelist[n]);
0208 continue;
0209 }
0210
0211
0212
0213
0214
0215
0216
0217
0218 if (0==strncmp(namelist[n]->d_name, tranmask, len))
0219 {
0220 snprintf(fnamefull, sizeof(fnamefull), "%s/%s", G_tmsrv_cfg.tlog_dir,
0221 namelist[n]->d_name);
0222 NDRX_LOG(log_warn, "Resuming transaction: [%s]",
0223 fnamefull);
0224
0225 if (EXSUCCEED!=tms_load_logfile(fnamefull,
0226 namelist[n]->d_name+len, &pp_tl))
0227 {
0228 NDRX_LOG(log_error, "Failed to resume transaction: [%s]",
0229 fnamefull);
0230
0231
0232
0233
0234 ret=ndrx_tms_file_registry_add(namelist[n]->d_name+len,
0235 NDRX_TMS_FILE_STATE_IGNORE);
0236
0237 NDRX_FREE(namelist[n]);
0238
0239 if (EXSUCCEED!=ret)
0240 {
0241 NDRX_LOG(log_error, "Failed to add tmxid: [%s] to registry (malloc err?)",
0242 namelist[n]->d_name+len);
0243 EXFAIL_OUT(ret);
0244 }
0245 continue;
0246 }
0247
0248 }
0249
0250 NDRX_FREE(namelist[n]);
0251 }
0252
0253 NDRX_FREE(namelist);
0254 namelist = NULL;
0255 }
0256
0257 out:
0258 if (NULL!=namelist)
0259 {
0260 NDRX_FREE(namelist);
0261 }
0262 return ret;
0263 }
0264
0265
0266
0267
0268
0269 exprivate void thread_sleep(int sleep_sec)
0270 {
0271 struct timespec wait_time;
0272 struct timeval now;
0273 int rt;
0274
0275 gettimeofday(&now,NULL);
0276
0277 wait_time.tv_sec = now.tv_sec+sleep_sec;
0278 wait_time.tv_nsec = now.tv_usec*1000;
0279
0280 MUTEX_LOCK_V(M_wait_mutex);
0281 rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0282 MUTEX_UNLOCK_V(M_wait_mutex);
0283 }
0284
0285
0286
0287
0288 expublic void background_wakeup(void)
0289 {
0290 MUTEX_LOCK_V(M_wait_mutex);
0291 pthread_cond_signal(&M_wait_cond);
0292 MUTEX_UNLOCK_V(M_wait_mutex);
0293 }
0294
0295
0296
0297
0298
0299 expublic int background_chkdisk(void)
0300 {
0301 char tmp[PATH_MAX+1];
0302 int i;
0303 int ret=EXSUCCEED;
0304 DIR *dir=NULL;
0305 struct dirent *entry;
0306 int len;
0307 char tranmask[256];
0308 atmi_xa_log_t *pp_tl = NULL;
0309
0310 snprintf(tranmask, sizeof(tranmask), "TRN-%ld-%hd-%d-", G_tmsrv_cfg.vnodeid,
0311 G_atmi_env.xa_rmid, G_server_conf.srv_id);
0312 len = strlen(tranmask);
0313
0314 dir = opendir(G_tmsrv_cfg.tlog_dir);
0315
0316 if (dir == NULL) {
0317
0318 NDRX_LOG(log_error, "opendir [%s] failed: %s",
0319 G_tmsrv_cfg.tlog_dir, strerror(errno));
0320 EXFAIL_OUT(ret);
0321 }
0322
0323 while ((entry = readdir(dir)) != NULL)
0324 {
0325 if (0==strncmp(entry->d_name, tranmask, len))
0326 {
0327
0328 NDRX_STRCPY_SAFE(tmp, entry->d_name+len);
0329
0330 if (!tms_log_exists_entry(tmp))
0331 {
0332 ndrx_tms_file_registry_t *p_reg = NULL;
0333 snprintf(tmp, sizeof(tmp), "%s/%s", G_tmsrv_cfg.tlog_dir,
0334 entry->d_name);
0335 if (ndrx_file_exists(tmp))
0336 {
0337 p_reg=ndrx_tms_file_registry_get(entry->d_name+len);
0338
0339 if (NULL==p_reg)
0340 {
0341 NDRX_LOG(log_error, "ERROR: Unkown transaction log file "
0342 "exists [%s] (duplicate processes?) - enqueue for load",
0343 tmp);
0344
0345 userlog("ERROR: Unkown transaction log file exists"
0346 " [%s] (duplicate processes?) - enqueue for load",
0347 tmp);
0348
0349 if (EXSUCCEED!=ndrx_tms_file_registry_add(entry->d_name+len,
0350 NDRX_TMS_FILE_STATE_INITIAL))
0351 {
0352 NDRX_LOG(log_error, "Failed to add tmxid: [%s] to registry (malloc err?)",
0353 entry->d_name+len);
0354 EXFAIL_OUT(ret);
0355 }
0356 }
0357 else if (NDRX_TMS_FILE_STATE_INITIAL==p_reg->state)
0358 {
0359 NDRX_LOG(log_warn, "Loading transaction log [%s]", tmp);
0360 userlog("Loading transaction log [%s]", tmp);
0361
0362 if (EXSUCCEED!=tms_load_logfile(tmp, entry->d_name+len, &pp_tl))
0363 {
0364 NDRX_LOG(log_error, "Failed to load transaction log [%s] - ignore log", tmp);
0365 userlog("Failed to load transaction log [%s] - ignore log", tmp);
0366
0367 p_reg->state = NDRX_TMS_FILE_STATE_IGNORE;
0368 }
0369 else
0370 {
0371 NDRX_LOG(log_info, "Transaction log [%s] loaded", tmp);
0372
0373 ndrx_tms_file_registry_del(p_reg);
0374 }
0375 }
0376 }
0377 }
0378 }
0379 }
0380
0381 out:
0382 if (NULL!=dir)
0383 {
0384 closedir(dir);
0385 }
0386
0387 return ret;
0388
0389 }
0390
0391
0392
0393
0394
0395
0396 expublic int background_loop(void)
0397 {
0398 int ret = EXSUCCEED;
0399 atmi_xa_log_list_t *tx_list;
0400 atmi_xa_log_list_t *el, *tmp;
0401 atmi_xa_tx_info_t xai;
0402 atmi_xa_log_t *p_tl;
0403
0404 memset(&xai, 0, sizeof(xai));
0405
0406 ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0407
0408 while(!G_bacground_req_shutdown)
0409 {
0410
0411 if (G_tmsrv_cfg.ping_time > 0)
0412 {
0413 tm_ping_db(NULL, NULL);
0414 }
0415
0416
0417
0418
0419
0420 if (G_tmsrv_cfg.chkdisk_time > 0 &&
0421 ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch) > G_tmsrv_cfg.chkdisk_time)
0422 {
0423
0424 background_chkdisk();
0425 ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
0426 }
0427
0428
0429
0430
0431
0432
0433
0434
0435 background_lock();
0436 tx_list = tms_copy_hash2list(COPY_MODE_BACKGROUND | COPY_MODE_ACQLOCK);
0437
0438 LL_FOREACH_SAFE(tx_list,el,tmp)
0439 {
0440
0441 NDRX_LOG(log_info, "XID: [%s] stage: [%hd]. Try: %ld, max: %d.",
0442 el->p_tl.tmxid, el->p_tl.txstage, el->p_tl.trycount,
0443 G_tmsrv_cfg.max_tries);
0444
0445 if (el->p_tl.trycount>=G_tmsrv_cfg.max_tries)
0446 {
0447 NDRX_LOG(log_warn, "Skipping try %ld of %ld...",
0448 el->p_tl.trycount, G_tmsrv_cfg.max_tries);
0449
0450 LL_DELETE(tx_list, el);
0451 NDRX_FREE(el);
0452 continue;
0453 }
0454
0455
0456 if (NULL!=(p_tl = tms_log_get_entry(el->p_tl.tmxid, 0, NULL)))
0457 {
0458 p_tl->trycount++;
0459
0460 NDRX_LOG(log_info, "XID: [%s] try counter increased to: %d",
0461 el->p_tl.tmxid, p_tl->trycount);
0462
0463
0464 if (EXSUCCEED!=tms_log_checkpointseq(p_tl))
0465 {
0466 EXFAIL_OUT(ret);
0467 }
0468 XA_TX_COPY((&xai), p_tl);
0469
0470
0471
0472
0473 tm_drive(&xai, p_tl, XA_OP_COMMIT, EXFAIL, 0L);
0474 }
0475 else
0476 {
0477 NDRX_LOG(log_debug, "Transaction locked or already "
0478 "processed by foreground...");
0479 }
0480
0481 LL_DELETE(tx_list, el);
0482 NDRX_FREE(el);
0483 }
0484
0485 background_unlock();
0486 NDRX_LOG(log_debug, "background - sleep %d",
0487 G_tmsrv_cfg.scan_time);
0488
0489 if (!G_bacground_req_shutdown)
0490 thread_sleep(G_tmsrv_cfg.scan_time);
0491 }
0492
0493 out:
0494 return ret;
0495 }
0496
0497
0498
0499
0500
0501 expublic void * background_process(void *arg)
0502 {
0503 NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
0504
0505 tm_thread_init();
0506
0507
0508
0509
0510
0511
0512
0513
0514
0515
0516
0517 background_loop();
0518
0519 tm_thread_uninit();
0520
0521 NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
0522
0523 return NULL;
0524 }
0525
0526
0527
0528
0529
0530 expublic int background_process_init(void)
0531 {
0532 int ret=EXSUCCEED;
0533 pthread_attr_t pthread_custom_attr;
0534
0535
0536
0537
0538
0539
0540
0541 if (EXSUCCEED!=background_read_log())
0542 {
0543 NDRX_LOG(log_error, "Failed to recover logs");
0544 userlog("Failed to recover logs");
0545 EXFAIL_OUT(ret);
0546 }
0547
0548 pthread_attr_init(&pthread_custom_attr);
0549
0550
0551
0552
0553 ndrx_platf_stack_set(&pthread_custom_attr);
0554 if (EXSUCCEED!=pthread_create(&G_bacground_thread, &pthread_custom_attr,
0555 background_process, NULL))
0556 {
0557 NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "background_process_init");
0558 EXFAIL_OUT(ret);
0559 }
0560 out:
0561 return ret;
0562
0563 }
0564