0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065 #include <string.h>
0066 #include <stdio.h>
0067 #include <stdlib.h>
0068 #include <memory.h>
0069 #include <math.h>
0070 #include <errno.h>
0071 #include <dirent.h>
0072 #include <sys/stat.h>
0073 #include <assert.h>
0074
0075 #include <atmi.h>
0076 #include <ubf.h>
0077 #include <ndebug.h>
0078 #include <ndrstandard.h>
0079 #include <nstopwatch.h>
0080
0081 #include <xa.h>
0082 #include <atmi_int.h>
0083 #include <unistd.h>
0084
0085 #include "userlog.h"
0086 #include "tmqueue.h"
0087 #include "nstdutil.h"
0088 #include "Exfields.h"
0089 #include "atmi_tls.h"
0090 #include "tmqd.h"
0091 #include <qcommon.h>
0092 #include <xa_cmn.h>
0093 #include <ubfutil.h>
0094 #include "qtran.h"
0095 #include <singlegrp.h>
0096 #include <sys_test.h>
0097
0098
0099
0100
0101
0102
0103 expublic char ndrx_G_qspace[XATMI_SERVICE_NAME_LENGTH+1];
0104 expublic char ndrx_G_qspacesvc[XATMI_SERVICE_NAME_LENGTH+1];
0105
0106
0107 exprivate char M_folder[PATH_MAX+1] = {EXEOS};
0108 exprivate char M_folder_active[PATH_MAX+1] = {EXEOS};
0109 exprivate char M_folder_prepared[PATH_MAX+1] = {EXEOS};
0110 exprivate char M_folder_committed[PATH_MAX+1] = {EXEOS};
0111 exprivate int volatile M_folder_set = EXFALSE;
0112 exprivate MUTEX_LOCKDECL(M_folder_lock);
0113 exprivate MUTEX_LOCKDECL(M_init);
0114
0115 exprivate int M_is_tmqueue = EXFALSE;
0116
0117 exprivate int (*M_p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname,
0118 short nodeid, short srvid, char *qspace, long flags);
0119 exprivate int (*M_p_tmq_dum_add)(char *tmxid);
0120 exprivate int (*M_p_tmq_unlock_msg)(union tmq_upd_block *b);
0121
0122 exprivate int (*M_p_tmq_msgid_exists)(char *msgid_str);
0123 exprivate void (*M_p_tpexit)(void);
0124
0125
0126 exprivate ndrx_stopwatch_t M_chkdisk_stopwatch;
0127 exprivate MUTEX_LOCKDECL(M_chkdisk_stopwatch_lock);
0128
0129
0130
0131 expublic int xa_open_entry_stat(char *xa_info, int rmid, long flags);
0132 expublic int xa_close_entry_stat(char *xa_info, int rmid, long flags);
0133 expublic int xa_start_entry_stat(XID *xid, int rmid, long flags);
0134 expublic int xa_end_entry_stat(XID *xid, int rmid, long flags);
0135 expublic int xa_rollback_entry_stat(XID *xid, int rmid, long flags);
0136 expublic int xa_prepare_entry_stat(XID *xid, int rmid, long flags);
0137 expublic int xa_commit_entry_stat(XID *xid, int rmid, long flags);
0138 expublic int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags);
0139 expublic int xa_forget_entry_stat(XID *xid, int rmid, long flags);
0140 expublic int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags);
0141
0142 expublic int xa_open_entry_dyn(char *xa_info, int rmid, long flags);
0143 expublic int xa_close_entry_dyn(char *xa_info, int rmid, long flags);
0144 expublic int xa_start_entry_dyn(XID *xid, int rmid, long flags);
0145 expublic int xa_end_entry_dyn(XID *xid, int rmid, long flags);
0146 expublic int xa_rollback_entry_dyn(XID *xid, int rmid, long flags);
0147 expublic int xa_prepare_entry_dyn(XID *xid, int rmid, long flags);
0148 expublic int xa_commit_entry_dyn(XID *xid, int rmid, long flags);
0149 expublic int xa_recover_entry_dyn(XID *xid, long count, int rmid, long flags);
0150 expublic int xa_forget_entry_dyn(XID *xid, int rmid, long flags);
0151 expublic int xa_complete_entry_dyn(int *handle, int *retval, int rmid, long flags);
0152
0153 expublic int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0154 expublic int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0155 expublic int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0156 expublic int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0157 expublic int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0158 expublic int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0159 expublic int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0160 expublic int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags);
0161 expublic int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0162 expublic int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags);
0163
0164 exprivate int read_tx_block(FILE *f, char *block, int len, char *fname, char *dbg_msg,
0165 int *err, int *tmq_err);
0166 exprivate int read_tx_from_file(char *fname, char *block, int len, int *err,
0167 int *tmq_err);
0168
0169 exprivate void dirent_free(struct dirent **namelist, int n);
0170
0171 exprivate int xa_rollback_entry_tmq(char *tmxid, long flags);
0172 exprivate int xa_prepare_entry_tmq(char *tmxid, long flags);
0173 exprivate int xa_commit_entry_tmq(char *tmxid, long flags);
0174 exprivate int write_to_tx_file(char *block, int len, char *cust_tmxid, int *int_diag);
0175 exprivate void tmq_chkdisk_th(void *ptr, int *p_finish_off);
0176 exprivate int tmq_check_prepared_exists_on_disk(char *tmxid);
0177
0178 struct xa_switch_t ndrxqstatsw =
0179 {
0180 .name = "ndrxqstatsw",
0181 .flags = TMNOFLAGS,
0182 .version = 0,
0183 .xa_open_entry = xa_open_entry_stat,
0184 .xa_close_entry = xa_close_entry_stat,
0185 .xa_start_entry = xa_start_entry_stat,
0186 .xa_end_entry = xa_end_entry_stat,
0187 .xa_rollback_entry = xa_rollback_entry_stat,
0188 .xa_prepare_entry = xa_prepare_entry_stat,
0189 .xa_commit_entry = xa_commit_entry_stat,
0190 .xa_recover_entry = xa_recover_entry_stat,
0191 .xa_forget_entry = xa_forget_entry_stat,
0192 .xa_complete_entry = xa_complete_entry_stat
0193 };
0194
0195 struct xa_switch_t ndrxqdynsw =
0196 {
0197 .name = "ndrxqdynsw",
0198 .flags = TMREGISTER,
0199 .version = 0,
0200 .xa_open_entry = xa_open_entry_dyn,
0201 .xa_close_entry = xa_close_entry_dyn,
0202 .xa_start_entry = xa_start_entry_dyn,
0203 .xa_end_entry = xa_end_entry_dyn,
0204 .xa_rollback_entry = xa_rollback_entry_dyn,
0205 .xa_prepare_entry = xa_prepare_entry_dyn,
0206 .xa_commit_entry = xa_commit_entry_dyn,
0207 .xa_recover_entry = xa_recover_entry_dyn,
0208 .xa_forget_entry = xa_forget_entry_dyn,
0209 .xa_complete_entry = xa_complete_entry_dyn
0210 };
0211
0212
0213
0214
0215
0216 expublic void tmq_set_tmqueue(
0217 int setting
0218 , int (*p_tmq_setup_cmdheader_dum)(tmq_cmdheader_t *hdr, char *qname,
0219 short nodeid, short srvid, char *qspace, long flags)
0220 , int (*p_tmq_dum_add)(char *tmxid)
0221 , int (*p_tmq_unlock_msg)(union tmq_upd_block *b)
0222 , void (**p_tmq_chkdisk_th)(void *ptr, int *p_finish_off)
0223 , int (*p_tmq_msgid_exists)(char *msgid_str)
0224 , void (*p_tpexit)(void)
0225 )
0226 {
0227 M_is_tmqueue = setting;
0228 M_p_tmq_setup_cmdheader_dum = p_tmq_setup_cmdheader_dum;
0229 M_p_tmq_dum_add = p_tmq_dum_add;
0230 M_p_tmq_unlock_msg = p_tmq_unlock_msg;
0231
0232 M_p_tmq_msgid_exists=p_tmq_msgid_exists;
0233 M_p_tpexit=p_tpexit;
0234
0235 NDRX_LOG(log_debug, "qdisk_xa config: M_is_tmqueue=%d "
0236 "M_p_tmq_setup_cmdheader_dum=%p M_p_tmq_dum_add=%p M_p_tmq_unlock_msg=%p "
0237 "M_p_tmq_msgid_exists=%p M_p_tpexit=%p",
0238 M_is_tmqueue, M_p_tmq_setup_cmdheader_dum, M_p_tmq_dum_add, M_p_tmq_unlock_msg,
0239 M_p_tmq_msgid_exists, M_p_tpexit);
0240
0241
0242 *p_tmq_chkdisk_th = tmq_chkdisk_th;
0243
0244 }
0245
0246
0247
0248
0249
0250
0251
0252 exprivate char *set_filename_base(XID *xid)
0253 {
0254 atmi_xa_serialize_xid(xid, G_atmi_tls->qdisk_tls->filename_base);
0255 return G_atmi_tls->qdisk_tls->filename_base;
0256 }
0257
0258
0259
0260
0261
0262
0263 exprivate char *set_filename_base_tmxid(char *tmxid)
0264 {
0265
0266 if (tmxid!=(char *)G_atmi_tls->qdisk_tls->filename_base)
0267 {
0268 NDRX_STRCPY_SAFE(G_atmi_tls->qdisk_tls->filename_base, tmxid);
0269 }
0270
0271 return G_atmi_tls->qdisk_tls->filename_base;
0272 }
0273
0274
0275
0276
0277
0278
0279 exprivate int set_filenames(int *p_seqno)
0280 {
0281
0282
0283
0284
0285 int ret = EXSUCCEED;
0286 int locke=EXFALSE;
0287 int seqno;
0288 qtran_log_t * p_tl = tmq_log_get_entry(G_atmi_tls->qdisk_tls->filename_base,
0289 NDRX_LOCK_WAIT_TIME, &locke);
0290
0291 if (NULL==p_tl)
0292 {
0293 NDRX_LOG(log_error, "Transaction [%s] not found",
0294 G_atmi_tls->qdisk_tls->filename_base);
0295 EXFAIL_OUT(ret);
0296 }
0297
0298 seqno = tmq_log_next(p_tl);
0299
0300 snprintf(G_atmi_tls->qdisk_tls->filename_active, sizeof(G_atmi_tls->qdisk_tls->filename_active),
0301 "%s/%s-%03d", M_folder_active, G_atmi_tls->qdisk_tls->filename_base, seqno);
0302
0303 snprintf(G_atmi_tls->qdisk_tls->filename_prepared, sizeof(G_atmi_tls->qdisk_tls->filename_prepared),
0304 "%s/%s-%03d", M_folder_prepared, G_atmi_tls->qdisk_tls->filename_base, seqno);
0305
0306 NDRX_LOG(log_info, "Filenames set to: [%s] [%s] (base: [%s])",
0307 G_atmi_tls->qdisk_tls->filename_active,
0308 G_atmi_tls->qdisk_tls->filename_prepared,
0309 G_atmi_tls->qdisk_tls->filename_base);
0310
0311 *p_seqno = seqno;
0312 out:
0313
0314
0315 if (NULL!=p_tl && !locke)
0316 {
0317 tmq_log_unlock(p_tl);
0318 }
0319
0320 return ret;
0321 }
0322
0323
0324
0325
0326
0327 exprivate int get_filenames_max(void)
0328 {
0329 int i=0;
0330 char filename_active[PATH_MAX+1];
0331 char filename_prepared[PATH_MAX+1];
0332
0333 while(1)
0334 {
0335 snprintf(filename_active, sizeof(filename_active), "%s/%s-%03d",
0336 M_folder_active, G_atmi_tls->qdisk_tls->filename_base, i+1);
0337 snprintf(filename_prepared, sizeof(filename_prepared), "%s/%s-%03d",
0338 M_folder_prepared, G_atmi_tls->qdisk_tls->filename_base, i+1);
0339 NDRX_LOG(log_debug, "Testing act: [%s] prep: [%s]", filename_active,
0340 filename_prepared);
0341 if (ndrx_file_exists(filename_active) ||
0342 ndrx_file_exists(filename_prepared))
0343 {
0344 i++;
0345 }
0346 else
0347 {
0348 break;
0349 }
0350 }
0351
0352 NDRX_LOG(log_info, "max file names %d", i);
0353 return i;
0354 }
0355
0356
0357
0358
0359
0360
0361
0362 exprivate char *get_filename_i(int i, char *folder, int slot)
0363 {
0364 static __thread char filename[2][PATH_MAX+1];
0365
0366 snprintf(filename[slot], sizeof(filename[0]), "%s/%s-%03d", folder,
0367 G_atmi_tls->qdisk_tls->filename_base, i);
0368
0369 return filename[slot];
0370 }
0371
0372
0373
0374
0375
0376
0377 exprivate char *get_file_name_final(char *fname)
0378 {
0379 static __thread char buf[PATH_MAX+1];
0380
0381 snprintf(buf, sizeof(buf), "%s/%s", M_folder_committed, fname);
0382 NDRX_LOG(log_debug, "Filename built: %s", buf);
0383
0384 return buf;
0385 }
0386
0387
0388
0389
0390
0391
0392
0393
0394
0395 exprivate int file_move(int i, char *from_folder, char *to_folder)
0396 {
0397 int ret = EXSUCCEED;
0398 char *f;
0399 char *t;
0400
0401
0402 f = get_filename_i(i, from_folder, 0);
0403 t = get_filename_i(i, to_folder, 1);
0404
0405 NDRX_LOG(log_info, "Rename [%s]->[%s]",
0406 f,t);
0407
0408
0409 if (ndrx_G_systest_lockloss || EXSUCCEED!=rename(f, t))
0410 {
0411 NDRX_LOG(log_error, "Failed to rename [%s]->[%s]: %s",
0412 f,t,
0413 strerror(errno));
0414 EXFAIL_OUT(ret);
0415 }
0416
0417 out:
0418 return ret;
0419 }
0420
0421
0422
0423
0424
0425
0426
0427 exprivate char * file_move_final_names(char *from_filename, char *to_filename_only)
0428 {
0429 int ret = EXSUCCEED;
0430
0431 char *to_filename = get_file_name_final(to_filename_only);
0432
0433 NDRX_LOG(log_debug, "Rename [%s] -> [%s]", from_filename, to_filename);
0434
0435 return to_filename;
0436 }
0437
0438
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450
0451
0452 exprivate int tmq_finalize_file(union tmq_upd_block *p_upd, char *fname1,
0453 char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0454 {
0455
0456 int ret = EXSUCCEED;
0457 BFLDOCC occ;
0458 char name1[PATH_MAX+1]="";
0459 char name2[PATH_MAX+1]="";
0460 char *p;
0461 char *files[2];
0462
0463
0464 if (NULL!=fname1)
0465 {
0466 NDRX_STRCPY_SAFE(name1, fname1);
0467 files[0]=name1;
0468 }
0469 else
0470 {
0471 files[0]=NULL;
0472 }
0473
0474 if (NULL!=fname2)
0475 {
0476 NDRX_STRCPY_SAFE(name2, fname2);
0477 files[1]=name2;
0478 }
0479 else
0480 {
0481 files[1]=NULL;
0482 }
0483
0484
0485
0486
0487
0488
0489 if (TMQ_FILECMD_UNLINK==fcmd)
0490 {
0491 for (occ=0; occ<N_DIM(files) && NULL!=files[occ]; occ++)
0492 {
0493 NDRX_LOG(log_debug, "Unlinking file [%s]", files[occ]);
0494
0495
0496 if (ndrx_G_systest_lockloss || EXSUCCEED!=unlink(files[occ]))
0497 {
0498 if (ENOENT!=errno)
0499 {
0500 int err = errno;
0501 NDRX_LOG(log_error, "Failed to unlinking file [%s] occ %d: %s",
0502 files[occ], occ, strerror(err));
0503 userlog("Failed to unlinking file [%s] occ %d: %s",
0504 files[occ], occ, strerror(err));
0505
0506 if (0==occ)
0507 {
0508 ret=XAER_RMERR;
0509 goto out;
0510 }
0511
0512 }
0513 }
0514
0515 if (0==occ)
0516 {
0517
0518 p=strrchr(files[occ], '/');
0519
0520 if (NULL!=p)
0521 {
0522 *p=EXEOS;
0523 }
0524
0525
0526 if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(files[occ], G_atmi_env.xa_fsync_flags))
0527 {
0528 NDRX_LOG(log_error, "Failed to dsync [%s]", files[occ]);
0529 ret=XAER_RMERR;
0530 goto out;
0531 }
0532 }
0533 }
0534 }
0535 else if (TMQ_FILECMD_RENAME==fcmd)
0536 {
0537 if (NULL==files[0] || NULL==files[1])
0538 {
0539 NDRX_LOG(log_error, "File 1 or 2 is NULL %p %p - cannot rename",
0540 files[0], files[1]);
0541 ret=XAER_RMERR;
0542 goto out;
0543 }
0544
0545 NDRX_LOG(log_debug, "About to rename: [%s] -> [%s]",
0546 name1, name2);
0547
0548
0549 if (ndrx_G_systest_lockloss|| EXSUCCEED!=rename(name1, name2))
0550 {
0551 int err = errno;
0552
0553 if (ENOENT==err && ndrx_file_exists(name2))
0554 {
0555 NDRX_LOG(log_error, "Failed to rename file [%s] -> [%s] occ %d: "
0556 "%s, but dest exists - assume retry",
0557 name1, name2, occ, strerror(err));
0558 }
0559 else
0560 {
0561 NDRX_LOG(log_error, "Failed to rename file [%s] -> [%s] occ %d: %s",
0562 name1, name2, occ, strerror(err));
0563 userlog("Failed to rename file [%s] -> [%s] occ %d: %s",
0564 name1, name2, occ, strerror(err));
0565 ret=XAER_RMERR;
0566 goto out;
0567 }
0568 }
0569
0570
0571 p=strrchr(name2, '/');
0572
0573 if (NULL!=p)
0574 {
0575 *p=EXEOS;
0576 }
0577
0578
0579 if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(name2, G_atmi_env.xa_fsync_flags))
0580 {
0581 NDRX_LOG(log_error, "Failed to dsync [%s]", name2);
0582 ret=XAER_RMERR;
0583 goto out;
0584 }
0585 }
0586 else
0587 {
0588 NDRX_LOG(log_error, "Unsupported file command %c", fcmd);
0589 ret=XAER_RMERR;
0590 goto out;
0591 }
0592
0593
0594 if (!tcmd->no_unlock && EXSUCCEED!=M_p_tmq_unlock_msg(p_upd))
0595 {
0596 ret=XAER_RMERR;
0597 goto out;
0598 }
0599
0600 out:
0601 return ret;
0602 }
0603
0604
0605
0606
0607
0608
0609
0610
0611
0612
0613 exprivate int tmq_finalize_files_upd(tmq_msg_upd_t *p_upd, char *fname1,
0614 char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0615 {
0616 union tmq_upd_block block;
0617
0618 memset(&block, 0, sizeof(block));
0619
0620 memcpy(&block.upd, p_upd, sizeof(*p_upd));
0621
0622 return tmq_finalize_file(&block, fname1, fname2, fcmd, tcmd);
0623 }
0624
0625
0626
0627
0628
0629
0630
0631
0632
0633
0634 exprivate int tmq_finalize_files_hdr(tmq_cmdheader_t *p_hdr, char *fname1,
0635 char *fname2, char fcmd, qtran_log_cmd_t *tcmd)
0636 {
0637 union tmq_upd_block block;
0638
0639 memset(&block, 0, sizeof(block));
0640 memcpy(&block.hdr, p_hdr, sizeof(*p_hdr));
0641
0642 return tmq_finalize_file(&block, fname1, fname2, fcmd, tcmd);
0643 }
0644
0645
0646
0647
0648
0649
0650 expublic int xa_open_entry_mkdir(char *xa_info)
0651 {
0652 int ret;
0653
0654 NDRX_STRNCPY(M_folder, xa_info, sizeof(M_folder)-2);
0655 M_folder[sizeof(M_folder)-1] = EXEOS;
0656
0657 NDRX_LOG(log_info, "Q data directory: [%s]", xa_info);
0658
0659
0660 NDRX_STRNCPY(M_folder_active, xa_info, sizeof(M_folder_active)-8);
0661 M_folder_active[sizeof(M_folder_active)-7] = EXEOS;
0662 NDRX_STRCAT_S(M_folder_active, sizeof(M_folder_active), "/active");
0663
0664 NDRX_STRNCPY(M_folder_prepared, xa_info, sizeof(M_folder_prepared)-10);
0665 M_folder_prepared[sizeof(M_folder_prepared)-9] = EXEOS;
0666 NDRX_STRCAT_S(M_folder_prepared, sizeof(M_folder_prepared), "/prepared");
0667
0668 NDRX_STRNCPY(M_folder_committed, xa_info, sizeof(M_folder_committed)-11);
0669 M_folder_committed[sizeof(M_folder_committed)-10] = EXEOS;
0670 NDRX_STRCAT_S(M_folder_committed, sizeof(M_folder_committed), "/committed");
0671
0672
0673 if (EXSUCCEED!=(ret=mkdir(M_folder, NDRX_DIR_PERM)) && ret!=EEXIST )
0674 {
0675 int err = errno;
0676
0677 if (err!=EEXIST)
0678 {
0679 NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0680 "[%s] - [%s]!", M_folder, strerror(err));
0681
0682 userlog("xa_open_entry() Q driver: failed to create directory "
0683 "[%s] - [%s]!", M_folder, strerror(err));
0684 return XAER_RMERR;
0685 }
0686 else
0687 {
0688 NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0689 "[%s] - [%s]!", M_folder, strerror(err));
0690 }
0691 }
0692
0693 if (EXSUCCEED!=(ret=mkdir(M_folder_active, NDRX_DIR_PERM)) && ret!=EEXIST )
0694 {
0695 int err = errno;
0696
0697 if (err!=EEXIST)
0698 {
0699 NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0700 "[%s] - [%s]!", M_folder_active, strerror(err));
0701
0702 userlog("xa_open_entry() Q driver: failed to create directory "
0703 "[%s] - [%s]!", M_folder_active, strerror(err));
0704 return XAER_RMERR;
0705 }
0706 else
0707 {
0708 NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0709 "[%s] - [%s]!", M_folder_active, strerror(err));
0710 }
0711 }
0712
0713 if (EXSUCCEED!=(ret=mkdir(M_folder_prepared, NDRX_DIR_PERM)) && ret!=EEXIST )
0714 {
0715 int err = errno;
0716
0717 if (err!=EEXIST)
0718 {
0719 NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0720 "[%s] - [%s]!", M_folder_prepared, strerror(err));
0721 userlog("xa_open_entry() Q driver: failed to create directory "
0722 "[%s] - [%s]!", M_folder_prepared, strerror(err));
0723 return XAER_RMERR;
0724 }
0725 else
0726 {
0727 NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0728 "[%s] - [%s]!", M_folder_prepared, strerror(err));
0729 }
0730 }
0731
0732 if (EXSUCCEED!=(ret=mkdir(M_folder_committed, NDRX_DIR_PERM)) && ret!=EEXIST )
0733 {
0734 int err = errno;
0735
0736 if (err!=EEXIST)
0737 {
0738 NDRX_LOG(log_error, "xa_open_entry() Q driver: failed to create directory "
0739 "[%s] - [%s]!", M_folder_committed, strerror(err));
0740 userlog("xa_open_entry() Q driver: failed to create directory "
0741 "[%s] - [%s]!", M_folder_committed, strerror(err));
0742 return XAER_RMERR;
0743 }
0744 else
0745 {
0746 NDRX_LOG(log_info, "xa_open_entry() Q driver: failed to create directory "
0747 "[%s] - [%s]!", M_folder_committed, strerror(err));
0748 }
0749 }
0750
0751 NDRX_LOG(log_info, "Prepared M_folder=[%s]", M_folder);
0752 NDRX_LOG(log_info, "Prepared M_folder_active=[%s]", M_folder_active);
0753 NDRX_LOG(log_info, "Prepared M_folder_prepared=[%s]", M_folder_prepared);
0754 NDRX_LOG(log_info, "Prepared M_folder_committed=[%s]", M_folder_committed);
0755
0756
0757 return XA_OK;
0758 }
0759
0760
0761
0762
0763
0764
0765
0766
0767
0768
0769 expublic int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0770 {
0771 int ret = XA_OK, err, i;
0772 static int first = EXTRUE;
0773 char *info_tmp = NULL;
0774 char *p, *val;
0775
0776
0777 if (first)
0778 {
0779 MUTEX_LOCK_V(M_init);
0780 if (first)
0781 {
0782 ndrx_xa_nosuspend(EXTRUE);
0783 first=EXFALSE;
0784 }
0785 MUTEX_UNLOCK_V(M_init);
0786 }
0787
0788 if (G_atmi_tls->qdisk_is_open)
0789 {
0790 NDRX_LOG(log_warn, "xa_open_entry() - already open!");
0791 return XA_OK;
0792 }
0793
0794 G_atmi_tls->qdisk_tls=NDRX_FPMALLOC(sizeof(ndrx_qdisk_tls_t), 0);
0795
0796 if (NULL==G_atmi_tls->qdisk_tls)
0797 {
0798 int err=errno;
0799
0800 NDRX_LOG(log_warn, "xa_open_entry() - failed to malloc TLS data: %s",
0801 strerror(err));
0802 return XAER_RMERR;
0803 }
0804
0805 G_atmi_tls->qdisk_tls->is_reg=EXFALSE;
0806 G_atmi_tls->qdisk_tls->filename_base[0]=EXEOS;
0807 G_atmi_tls->qdisk_tls->filename_active[0]=EXEOS;
0808 G_atmi_tls->qdisk_tls->filename_prepared[0]=EXEOS;
0809
0810 G_atmi_tls->qdisk_tls->recover_namelist = NULL;
0811 G_atmi_tls->qdisk_tls->recover_open=EXFALSE;
0812 G_atmi_tls->qdisk_tls->recover_i=EXFAIL;
0813 G_atmi_tls->qdisk_tls->recover_last_loaded=EXFALSE;
0814
0815 G_atmi_tls->qdisk_is_open = EXTRUE;
0816 G_atmi_tls->qdisk_rmid = rmid;
0817
0818 #define UNLOCK_OUT MUTEX_UNLOCK_V(M_folder_lock);\
0819 ret=XAER_RMERR;\
0820 goto out;
0821
0822
0823 if (!M_folder_set)
0824 {
0825
0826 MUTEX_LOCK_V(M_folder_lock);
0827
0828 if (!M_folder_set)
0829 {
0830
0831 info_tmp = NDRX_STRDUP(xa_info);
0832
0833 if (NULL==info_tmp)
0834 {
0835 err=errno;
0836 NDRX_LOG(log_error, "Failed to strdup: %s", strerror(err));
0837 userlog("Failed to strdup: %s", strerror(err));
0838 UNLOCK_OUT;
0839 }
0840
0841 #define ARGS_DELIM ","
0842 #define ARGS_QUOTE "'\""
0843 #define ARG_DIR "datadir"
0844 #define ARG_QSPACE "qspace"
0845
0846
0847 for (p = ndrx_strtokblk ( info_tmp, ARGS_DELIM, ARGS_QUOTE), i=0;
0848 NULL!=p;
0849 p = ndrx_strtokblk (NULL, ARGS_DELIM, ARGS_QUOTE), i++)
0850 {
0851 if (NULL!=(val = strchr(p, '=')))
0852 {
0853 *val = EXEOS;
0854 val++;
0855 }
0856
0857
0858 if (0==strcmp(ARG_DIR, p))
0859 {
0860
0861 ret=xa_open_entry_mkdir(val);
0862
0863 if (EXSUCCEED!=ret)
0864 {
0865 NDRX_LOG(log_error, "Failed to prepare data directory [%s]", val);
0866 UNLOCK_OUT;
0867 }
0868 }
0869 else if (0==strcmp(ARG_QSPACE, p))
0870 {
0871 NDRX_STRCPY_SAFE(ndrx_G_qspace, val);
0872 }
0873
0874 }
0875
0876 if (EXEOS==ndrx_G_qspace[0])
0877 {
0878 NDRX_LOG(log_error, "[%s] setting not found in open string!", ARG_QSPACE);
0879 UNLOCK_OUT;
0880 }
0881
0882 if (EXEOS==M_folder[0])
0883 {
0884 NDRX_LOG(log_error, "[%s] setting not found in open string!", ARG_DIR);
0885 UNLOCK_OUT;
0886 }
0887
0888 snprintf(ndrx_G_qspacesvc, sizeof(ndrx_G_qspacesvc),
0889 NDRX_SVC_QSPACE, ndrx_G_qspace);
0890
0891 NDRX_LOG(log_debug, "Qspace set to: [%s]", ndrx_G_qspace);
0892 NDRX_LOG(log_debug, "Qspace svc set to: [%s]", ndrx_G_qspacesvc);
0893 M_folder_set=EXTRUE;
0894
0895 }
0896 MUTEX_UNLOCK_V(M_folder_lock);
0897
0898 }
0899 out:
0900
0901 if (NULL!=info_tmp)
0902 {
0903 NDRX_FREE(info_tmp);
0904 }
0905 return ret;
0906 }
0907
0908
0909
0910
0911
0912
0913
0914
0915 expublic int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0916 {
0917 NDRX_LOG(log_info, "xa_close_entry() called");
0918
0919 if (NULL!=G_atmi_tls->qdisk_tls)
0920 {
0921 NDRX_FPFREE(G_atmi_tls->qdisk_tls);
0922 G_atmi_tls->qdisk_tls=NULL;
0923 }
0924
0925 G_atmi_tls->qdisk_is_open = EXFALSE;
0926
0927 return XA_OK;
0928 }
0929
0930
0931
0932
0933
0934 exprivate int xa_start_entry_tmq(char *tmxid, long flags)
0935 {
0936 int locke = EXFALSE;
0937 qtran_log_t * p_tl = NULL;
0938 int ret = XA_OK;
0939
0940 set_filename_base_tmxid(tmxid);
0941
0942
0943 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0944
0945 if ( (flags & TMJOIN) || (flags & TMRESUME) )
0946 {
0947 if (NULL==p_tl && !locke)
0948 {
0949 NDRX_LOG(log_error, "Xid [%s] TMJOIN/TMRESUME but tran not found",
0950 tmxid);
0951 ret = XAER_NOTA;
0952 goto out;
0953 }
0954
0955 NDRX_LOG(log_info, "Xid [%s] join OK", tmxid);
0956 }
0957 else
0958 {
0959
0960 if (NULL!=p_tl || locke)
0961 {
0962 NDRX_LOG(log_error, "Cannot start Xid [%s] already in progress",
0963 tmxid);
0964 ret = XAER_DUPID;
0965 goto out;
0966 }
0967 else
0968 {
0969
0970 if (EXSUCCEED!=tmq_log_start(tmxid))
0971 {
0972 NDRX_LOG(log_error, "Failed to start transaction for tmxid [%s]",
0973 tmxid);
0974 ret = XAER_RMERR;
0975 goto out;
0976 }
0977 NDRX_LOG(log_info, "Queue transaction Xid [%s] started OK", tmxid);
0978 }
0979 }
0980
0981 out:
0982
0983 if (NULL!=p_tl && !locke)
0984 {
0985 tmq_log_unlock(p_tl);
0986 }
0987
0988 return ret;
0989 }
0990
0991
0992
0993
0994
0995
0996
0997 exprivate int ndrx_xa_qminicall(char *tmxid, char cmd)
0998 {
0999 long rsplen;
1000 UBFH *p_ub = NULL;
1001 long ret = XA_OK;
1002 short nodeid = (short)tpgetnodeid();
1003
1004 p_ub = (UBFH *)tpalloc("UBF", "", 1024 );
1005
1006 if (NULL==p_ub)
1007 {
1008 NDRX_LOG(log_error, "Failed to allocate notif buffer");
1009 ret = XAER_RMERR;
1010 goto out;
1011 }
1012
1013 if (EXSUCCEED!=Bchg(p_ub, EX_QCMD, 0, &cmd, 0L))
1014 {
1015 NDRX_LOG(log_error, "Failed to setup EX_QMSGID!");
1016 ret = XAER_RMERR;
1017 goto out;
1018 }
1019
1020 if (EXSUCCEED!=Bchg(p_ub, TMXID, 0, tmxid, 0L))
1021 {
1022 NDRX_LOG(log_error, "Failed to setup TMXID!");
1023 ret = XAER_RMERR;
1024 goto out;
1025 }
1026
1027 if (EXSUCCEED!=Bchg(p_ub, TMNODEID, 0, (char *)&nodeid, 0L))
1028 {
1029 NDRX_LOG(log_error, "Failed to setup TMNODEID!");
1030 ret = XAER_RMERR;
1031 goto out;
1032 }
1033
1034 NDRX_LOG(log_info, "Calling QSPACE [%s] for tmxid [%s], command %c",
1035 ndrx_G_qspacesvc, tmxid, cmd);
1036
1037 ndrx_debug_dump_UBF(log_info, "calling Q space with", p_ub);
1038
1039 if (EXFAIL == tpcall(ndrx_G_qspacesvc, (char *)p_ub, 0L, (char **)&p_ub,
1040 &rsplen, TPNOTRAN))
1041 {
1042 NDRX_LOG(log_error, "%s failed: %s", ndrx_G_qspacesvc, tpstrerror(tperrno));
1043
1044
1045 ret = XAER_RMFAIL;
1046
1047
1048 }
1049
1050 ndrx_debug_dump_UBF(log_info, "Reply from RM", p_ub);
1051
1052
1053 if (Bpres(p_ub, TMTXRMERRCODE, 0) &&
1054 EXSUCCEED!=Bget(p_ub, TMTXRMERRCODE, 0, (char *)&ret, 0L))
1055 {
1056 NDRX_LOG(log_debug, "Failed to get TMTXRMERRCODE: %s", Bstrerror(Berror));
1057 ret = XAER_RMERR;
1058 }
1059
1060 out:
1061
1062 if (NULL!=p_ub)
1063 {
1064 tpfree((char *)p_ub);
1065 }
1066
1067 NDRX_LOG(log_info, "returns %d", ret);
1068
1069 return ret;
1070 }
1071
1072
1073
1074
1075
1076
1077
1078 expublic int ndrx_xa_qminiservce(UBFH *p_ub, char cmd)
1079 {
1080 long ret = XA_OK;
1081 char tmxid[NDRX_XID_SERIAL_BUFSIZE+1];
1082 short nodeid, nodeid_loc;
1083
1084 BFLDLEN len = sizeof(tmxid);
1085
1086
1087
1088
1089
1090
1091 if (EXSUCCEED!=Bget(p_ub, TMXID, 0, tmxid, &len))
1092 {
1093 NDRX_LOG(log_error, "Failed to get TMXID!");
1094 ret = XAER_INVAL;
1095 goto out;
1096 }
1097
1098 if (EXSUCCEED!=Bget(p_ub, TMNODEID, 0, (char *)&nodeid, 0L))
1099 {
1100 NDRX_LOG(log_error, "Failed to get TMNODEID!");
1101 ret = XAER_INVAL;
1102 goto out;
1103 }
1104
1105 if (G_atmi_env.procgrp_no && ndrx_sg_is_singleton(G_atmi_env.procgrp_no)
1106
1107 && TMQ_CMD_STARTTRAN!=cmd)
1108 {
1109
1110
1111
1112
1113 nodeid_loc=tpgetnodeid();
1114 if (nodeid!=nodeid_loc)
1115 {
1116 NDRX_LOG(log_error, "ndrx_xa_qminiservce: singleton group tmqueue+tmsrv must be on "
1117 "the same node but our node is %hd rcvd call from %hd - XAER_RMFAIL",
1118 nodeid_loc, nodeid);
1119 userlog("ndrx_xa_qminiservce: singleton group tmqueue+tmsrv must be on "
1120 "the same node but our node is %hd rcvd call from %hd - XAER_RMFAIL",
1121 nodeid_loc, nodeid);
1122 ret = XAER_RMFAIL;
1123 goto out;
1124 }
1125 }
1126
1127 switch (cmd)
1128 {
1129 case TMQ_CMD_STARTTRAN:
1130 ret = xa_start_entry_tmq(tmxid, 0);
1131 break;
1132 case TMQ_CMD_ABORTTRAN:
1133 ret = xa_rollback_entry_tmq(tmxid, 0);
1134 break;
1135 case TMQ_CMD_PREPARETRAN:
1136 ret = xa_prepare_entry_tmq(tmxid, 0);
1137 break;
1138 case TMQ_CMD_COMMITRAN:
1139 ret = xa_commit_entry_tmq(tmxid, 0);
1140 break;
1141
1142 case TMQ_CMD_CHK_MEMLOG:
1143 case TMQ_CMD_CHK_MEMLOG2:
1144
1145 ret = tmq_log_exists_entry(tmxid);
1146
1147 if (EXTRUE==ret)
1148 {
1149 ret=XA_OK;
1150 }
1151 else
1152 {
1153 ret=XAER_NOTA;
1154 }
1155
1156 if (TMQ_CMD_CHK_MEMLOG2==cmd)
1157 {
1158 NDRX_LOG(log_error, "Expected transaction to exist in "
1159 "log [%s] but not found - restarting", tmxid);
1160 userlog("Expected transaction to exist in "
1161 "log [%s] but not found - restarting", tmxid);
1162 M_p_tpexit();
1163 }
1164
1165 break;
1166 default:
1167 NDRX_LOG(log_error, "Invalid command code [%c]", cmd);
1168 ret = XAER_INVAL;
1169 break;
1170 }
1171
1172 out:
1173
1174 NDRX_LOG(log_info, "returns XA status: %d", ret);
1175
1176 if (EXSUCCEED!=Bchg(p_ub, TMTXRMERRCODE, 0, (char *)&ret, 0L))
1177 {
1178 NDRX_LOG(log_error, "Failed to setup TMTXRMERRCODE: %s",
1179 Bstrerror(Berror));
1180 ret = XAER_RMERR;
1181 }
1182
1183 return ret;
1184 }
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195 expublic int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1196 {
1197 char *tmxid;
1198 int ret = XA_OK;
1199
1200 if (!G_atmi_tls->qdisk_is_open)
1201 {
1202 NDRX_LOG(log_error, "ERROR! xa_start_entry() - XA not open!");
1203 ret = XAER_RMERR;
1204 goto out;
1205 }
1206
1207 tmxid = set_filename_base(xid);
1208
1209
1210
1211
1212
1213
1214 if (M_is_tmqueue)
1215 {
1216 ret = xa_start_entry_tmq(tmxid, flags);
1217 }
1218 else if (! ( (flags & TMJOIN) || (flags & TMRESUME) ))
1219 {
1220
1221
1222
1223
1224 ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_STARTTRAN);
1225 }
1226
1227 out:
1228
1229 return ret;
1230 }
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240 expublic int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1241 {
1242 if (!G_atmi_tls->qdisk_is_open)
1243 {
1244 NDRX_LOG(log_error, "ERROR! xa_end_entry() - XA not open!");
1245 return XAER_RMERR;
1246 }
1247
1248 if (G_atmi_tls->qdisk_tls->is_reg)
1249 {
1250 if (EXSUCCEED!=ax_unreg(rmid, 0))
1251 {
1252 NDRX_LOG(log_error, "ERROR! xa_end_entry() - "
1253 "ax_unreg() fail!");
1254 return XAER_RMERR;
1255 }
1256
1257 G_atmi_tls->qdisk_tls->is_reg = EXFALSE;
1258 }
1259
1260
1261
1262 out:
1263
1264 return XA_OK;
1265 }
1266
1267
1268
1269
1270
1271
1272
1273 exprivate int xa_rollback_entry_tmq(char *tmxid, long flags)
1274 {
1275 union tmq_upd_block b;
1276 char *fn = "xa_rollback_entry_tmq";
1277 qtran_log_cmd_t *el, *elt;
1278 int ret=XA_OK;
1279
1280 int locke = EXFALSE;
1281 qtran_log_t * p_tl = NULL;
1282
1283 if (!G_atmi_tls->qdisk_is_open)
1284 {
1285 NDRX_LOG(log_error, "ERROR! xa_rollback_entry() - XA not open!");
1286 return XAER_RMERR;
1287 }
1288
1289 set_filename_base_tmxid(tmxid);
1290
1291
1292 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1293
1294 if (NULL==p_tl)
1295 {
1296 if (locke)
1297 {
1298 NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1299 return XAER_RMFAIL;
1300 }
1301 else
1302 {
1303 NDRX_LOG(log_info, "Q transaction [%s] does not exists", tmxid);
1304
1305
1306
1307 ret=tmq_check_prepared_exists_on_disk(tmxid);
1308
1309 if (EXTRUE==ret)
1310 {
1311
1312
1313
1314
1315
1316 NDRX_LOG(log_error, "(rollback) Integrity problem, transaction [%s] "
1317 "exists on disk, but not in mem-log - restarting", tmxid);
1318 userlog("(rollback) Integrity problem, transaction [%s] "
1319 "exists on disk, but not in mem-log - restarting", tmxid);
1320
1321 M_p_tpexit();
1322 return XAER_RMFAIL;
1323 }
1324 else if (EXFAIL==ret)
1325 {
1326 return XAER_RMFAIL;
1327 }
1328 else
1329 {
1330 return XAER_NOTA;
1331 }
1332 }
1333 }
1334
1335 p_tl->txstage = XA_TX_STAGE_ABORTING;
1336 p_tl->is_abort_only=EXTRUE;
1337
1338 #ifdef TXN_TRACE
1339 userlog("ABORT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1340 NDRX_LOG(log_error, "ABORT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1341 #endif
1342
1343
1344 DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1345 {
1346 char *fname = NULL;
1347
1348 #ifdef TXN_TRACE
1349 userlog("ABORT_ENT: tmxid=[%s] command_code=[%c]",
1350 tmxid, el->b.hdr.command_code);
1351 NDRX_LOG(log_error, "ABORT_ENT: tmxid=[%s] command_code=[%c]",
1352 tmxid, el->b.hdr.command_code);
1353 #endif
1354
1355 if (XA_RM_STATUS_ACTIVE==el->cmd_status)
1356 {
1357
1358 fname = get_filename_i(el->seqno, M_folder_active, 0);
1359 }
1360 else if (XA_RM_STATUS_PREP==el->cmd_status)
1361 {
1362
1363 fname = get_filename_i(el->seqno, M_folder_prepared, 0);
1364 }
1365 else
1366 {
1367 NDRX_LOG(log_error, "Invalid QCMD status %c", el->cmd_status);
1368 userlog("Invalid QCMD status %c", el->cmd_status);
1369 continue;
1370 }
1371
1372 memcpy(&b, &el->b, sizeof(b));
1373
1374
1375 if (TMQ_STORCMD_DUM == el->b.hdr.command_code)
1376 {
1377
1378 }
1379 else if (TMQ_STORCMD_NEWMSG == el->b.hdr.command_code)
1380 {
1381 NDRX_LOG(log_info, "%s: issuing delete command...", fn);
1382 b.hdr.command_code = TMQ_STORCMD_DEL;
1383 }
1384 else
1385 {
1386 NDRX_LOG(log_info, "%s: unlock command...", fn);
1387
1388 b.hdr.command_code = TMQ_STORCMD_UNLOCK;
1389 }
1390
1391
1392
1393
1394 if (EXSUCCEED!=tmq_finalize_files_hdr(&b.hdr, fname,
1395 NULL, TMQ_FILECMD_UNLINK, el))
1396 {
1397 NDRX_LOG(log_error, "Failed to unlink [%s]", fname);
1398 continue;
1399 }
1400
1401
1402 DL_DELETE(p_tl->cmds, el);
1403 NDRX_FPFREE(el);
1404 NDRX_LOG(log_debug, "Abort [%s] OK", fname);
1405
1406 }
1407
1408 if (NULL!=p_tl->cmds)
1409 {
1410 NDRX_LOG(log_error, "Failed to abort Q transaction [%s] -> commands exists",
1411 tmxid);
1412 return XAER_RMERR;
1413 }
1414
1415
1416 tmq_remove_logfree(p_tl, EXTRUE);
1417
1418 return XA_OK;
1419 }
1420
1421
1422
1423
1424
1425 expublic int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1426 {
1427 char *tmxid;
1428 int ret = XA_OK;
1429
1430 if (!G_atmi_tls->qdisk_is_open)
1431 {
1432 NDRX_LOG(log_error, "ERROR! xa_rollback_entry() - XA not open!");
1433 ret = XAER_RMERR;
1434 goto out;
1435 }
1436
1437 tmxid = set_filename_base(xid);
1438
1439 if (M_is_tmqueue)
1440 {
1441 ret = xa_rollback_entry_tmq(tmxid, flags);
1442 }
1443 else
1444 {
1445 ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_ABORTTRAN);
1446 }
1447
1448 out:
1449
1450 return ret;
1451 }
1452
1453
1454
1455
1456
1457 exprivate int xa_prepare_entry_tmq(char *tmxid, long flags)
1458 {
1459 int locke = EXFALSE;
1460 qtran_log_t * p_tl = NULL;
1461 int ret = XA_OK;
1462 int did_move=EXFALSE;
1463 qtran_log_cmd_t *el, *elt;
1464
1465 if (!G_atmi_tls->qdisk_is_open)
1466 {
1467 NDRX_LOG(log_error, "ERROR! xa_prepare_entry_tmq() - XA not open!");
1468 ret=XAER_RMERR;
1469 goto out;
1470 }
1471
1472 set_filename_base_tmxid(tmxid);
1473
1474
1475 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1476
1477 if (NULL==p_tl)
1478 {
1479 if (locke)
1480 {
1481 NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1482 ret=XAER_RMFAIL;
1483 goto out;
1484 }
1485 else
1486 {
1487 NDRX_LOG(log_error, "Q transaction [%s] does not exists", tmxid);
1488 ret=XAER_NOTA;
1489 goto out;
1490 }
1491 }
1492
1493 if (p_tl->is_abort_only)
1494 {
1495 NDRX_LOG(log_error, "Q transaction [%s] is abort only!", tmxid);
1496 ret = XAER_RMERR;
1497 goto out;
1498 }
1499
1500 if (XA_TX_STAGE_ACTIVE!=p_tl->txstage)
1501 {
1502 NDRX_LOG(log_error, "Q transaction [%s] expected stage %hd (active) got %hd",
1503 tmxid, XA_TX_STAGE_ACTIVE, p_tl->txstage);
1504
1505 ret = XAER_RMERR;
1506 p_tl->is_abort_only=EXTRUE;
1507 goto out;
1508 }
1509
1510
1511 tmq_log_checkpointseq(p_tl);
1512
1513 p_tl->txstage = XA_TX_STAGE_PREPARING;
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528 if (NULL==p_tl->cmds)
1529 {
1530
1531 if (EXSUCCEED!=M_p_tmq_dum_add(p_tl->tmxid))
1532 {
1533 ret = XAER_RMERR;
1534 p_tl->is_abort_only=EXTRUE;
1535 goto out;
1536 }
1537
1538 NDRX_LOG(log_debug, "Dummy marker added");
1539
1540 }
1541
1542
1543 DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1544 {
1545 if (EXSUCCEED!=file_move(el->seqno, M_folder_active, M_folder_prepared))
1546 {
1547 NDRX_LOG(log_error, "Q tran tmxid [%s] seq %d failed to prepare (file move)",
1548 tmxid, el->seqno);
1549 p_tl->is_abort_only=EXTRUE;
1550 ret=XAER_RMERR;
1551 goto out;
1552 }
1553
1554 el->cmd_status = XA_RM_STATUS_PREP;
1555 NDRX_LOG(log_info, "tmxid [%s] seq %d prepared OK", tmxid, el->seqno);
1556 did_move=EXTRUE;
1557 }
1558
1559 if (did_move)
1560 {
1561
1562
1563 if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_dsync(M_folder_prepared, G_atmi_env.xa_fsync_flags))
1564 {
1565 NDRX_LOG(log_error, "Failed to dsync [%s]", M_folder_prepared);
1566
1567
1568 p_tl->is_abort_only=EXTRUE;
1569 ret=XAER_RMERR;
1570 goto out;
1571 }
1572 }
1573
1574
1575 p_tl->txstage = XA_TX_STAGE_PREPARED;
1576
1577 out:
1578
1579 if (NULL!=p_tl && !locke)
1580 {
1581 tmq_log_unlock(p_tl);
1582 }
1583
1584 return ret;
1585 }
1586
1587
1588
1589
1590
1591
1592
1593
1594 expublic int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1595 {
1596 char *tmxid;
1597 int ret = XA_OK;
1598
1599 if (!G_atmi_tls->qdisk_is_open)
1600 {
1601 NDRX_LOG(log_error, "ERROR! xa_prepare_entry() - XA not open!");
1602 ret = XAER_RMERR;
1603 goto out;
1604 }
1605
1606 tmxid = set_filename_base(xid);
1607
1608 if (M_is_tmqueue)
1609 {
1610 ret = xa_prepare_entry_tmq(tmxid, flags);
1611 }
1612 else
1613 {
1614 ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_PREPARETRAN);
1615 }
1616
1617 out:
1618
1619 return ret;
1620 }
1621
1622
1623
1624
1625
1626
1627 exprivate int tmq_check_prepared_exists_on_disk(char *tmxid)
1628 {
1629 char tmp[PATH_MAX+1];
1630 int i;
1631 int ret=EXSUCCEED;
1632 DIR *dir=NULL;
1633 struct dirent *entry;
1634 int len;
1635
1636 dir = opendir(M_folder_prepared);
1637
1638 if (dir == NULL) {
1639
1640 NDRX_LOG(log_error, "opendir [%s] failed: %s", M_folder_prepared, strerror(errno));
1641 EXFAIL_OUT(ret);
1642 }
1643
1644 snprintf(tmp, sizeof(tmp), "%s-", tmxid);
1645 len = strlen(tmp);
1646
1647 while ((entry = readdir(dir)) != NULL)
1648 {
1649 if (0==strncmp(entry->d_name, tmp, len))
1650 {
1651 ret=EXTRUE;
1652 break;
1653 }
1654 }
1655
1656 out:
1657 if (NULL!=dir)
1658 {
1659 closedir(dir);
1660 }
1661
1662 return ret;
1663 }
1664
1665
1666
1667
1668
1669
1670
1671
1672 exprivate int xa_commit_entry_tmq(char *tmxid, long flags)
1673 {
1674 char msgid_str[TMMSGIDLEN_STR+1];
1675 FILE *f = NULL;
1676 char *fname;
1677 char *fname_msg;
1678 int err, tmq_err;
1679 int locke = EXFALSE;
1680 qtran_log_t * p_tl = NULL;
1681 int ret = XA_OK;
1682 qtran_log_cmd_t *el, *elt;
1683
1684 if (!G_atmi_tls->qdisk_is_open)
1685 {
1686 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - XA not open!");
1687 return XAER_RMERR;
1688 }
1689
1690 set_filename_base_tmxid(tmxid);
1691
1692
1693 p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
1694
1695 if (NULL==p_tl)
1696 {
1697 if (locke)
1698 {
1699 NDRX_LOG(log_error, "Q transaction [%s] locked", tmxid);
1700 ret=XAER_RMFAIL;
1701 goto out;
1702 }
1703 else
1704 {
1705 NDRX_LOG(log_error, "Q transaction [%s] does not exists (in mem log)", tmxid);
1706
1707 ret=tmq_check_prepared_exists_on_disk(tmxid);
1708
1709 if (EXTRUE==ret)
1710 {
1711
1712
1713
1714
1715
1716 NDRX_LOG(log_error, "(commit) Integrity problem, transaction [%s] "
1717 "exists on disk, but not in mem-log - restarting", tmxid);
1718 userlog("(commit) Integrity problem, transaction [%s] "
1719 "exists on disk, but not in mem-log - restarting", tmxid);
1720
1721 ret=XAER_RMFAIL;
1722
1723 M_p_tpexit();
1724 goto out;
1725 }
1726 else if (EXFAIL==ret)
1727 {
1728 ret=XAER_RMFAIL;
1729 goto out;
1730 }
1731 else
1732 {
1733 ret=XAER_NOTA;
1734 goto out;
1735 }
1736 }
1737 }
1738
1739 if (p_tl->is_abort_only)
1740 {
1741 NDRX_LOG(log_error, "Q transaction [%s] is abort only!", tmxid);
1742 ret = XAER_RMERR;
1743 goto out;
1744 }
1745
1746
1747 if (XA_TX_STAGE_PREPARED!=p_tl->txstage &&
1748 XA_TX_STAGE_COMMITTING!=p_tl->txstage)
1749 {
1750 NDRX_LOG(log_error, "Q transaction [%s] expected stage %hd (prepared) or %hd (committing) got %hd",
1751 tmxid, XA_TX_STAGE_PREPARED, XA_TX_STAGE_COMMITTING, p_tl->txstage);
1752
1753 ret = XAER_RMERR;
1754 p_tl->is_abort_only=EXTRUE;
1755 goto out;
1756 }
1757
1758 p_tl->txstage = XA_TX_STAGE_COMMITTING;
1759
1760
1761 #ifdef TXN_TRACE
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775 userlog("COMMIT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1776 NDRX_LOG(log_error, "COMMIT: tmxid=[%s] seqno=%d", tmxid, p_tl->seqno);
1777 #endif
1778 DL_FOREACH_SAFE(p_tl->cmds, el, elt)
1779 {
1780
1781 fname = get_filename_i(el->seqno, M_folder_prepared, 0);
1782
1783 #ifdef TXN_TRACE
1784 userlog("COMMIT_ENT: tmxid=[%s] command_code=[%c] fname=[%s]",
1785 tmxid, el->b.hdr.command_code, fname);
1786 NDRX_LOG(log_error, "COMMIT_ENT: tmxid=[%s] command_code=[%c] fname=[%s]",
1787 tmxid, el->b.hdr.command_code, fname);
1788 #endif
1789
1790
1791 if (TMQ_STORCMD_NEWMSG == el->b.hdr.command_code)
1792 {
1793 char *to_filename;
1794
1795
1796
1797
1798
1799 to_filename = file_move_final_names(fname,
1800 tmq_msgid_serialize(el->b.hdr.msgid, msgid_str));
1801
1802 if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname,
1803 to_filename, TMQ_FILECMD_RENAME, el))
1804 {
1805 ret = XAER_RMFAIL;
1806 goto out;
1807 }
1808 }
1809 else if (TMQ_STORCMD_UPD == el->b.hdr.command_code)
1810 {
1811 tmq_msg_t msg_to_upd;
1812 int ret_len;
1813
1814 fname_msg = get_file_name_final(tmq_msgid_serialize(el->b.hdr.msgid,
1815 msgid_str));
1816 NDRX_LOG(log_info, "Updating message file: [%s]", fname_msg);
1817
1818 if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(fname_msg, "r+b")))
1819 {
1820 int err = errno;
1821 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to open file[%s]: %s!",
1822 fname_msg, strerror(err));
1823
1824 userlog( "ERROR! xa_commit_entry() - failed to open file[%s]: %s!",
1825 fname_msg, strerror(err));
1826 ret = XAER_RMFAIL;
1827 goto out;
1828 }
1829
1830 ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
1831
1832 if (EXFAIL==read_tx_block(f, (char *)&msg_to_upd, sizeof(msg_to_upd),
1833 fname_msg, "xa_commit_entry", &err, &tmq_err))
1834 {
1835 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to read data block!");
1836 ret = XAER_RMFAIL;
1837 goto out;
1838 }
1839
1840
1841 if (EXSUCCEED!=fseek (f, 0 , SEEK_SET ))
1842 {
1843 NDRX_LOG(log_error, "Seekset failed: %s", strerror(errno));
1844 ret = XAER_RMFAIL;
1845 goto out;
1846 }
1847
1848 UPD_MSG((&msg_to_upd), (&el->b.upd));
1849
1850
1851
1852 if (ndrx_G_systest_lockloss || sizeof(msg_to_upd)!=(ret_len=fwrite((char *)&msg_to_upd, 1,
1853 sizeof(msg_to_upd), f)))
1854 {
1855 int err = errno;
1856 NDRX_LOG(log_error, "ERROR! Failed to write to msg file [%s]: "
1857 "req_len=%d, written=%d: %s", fname_msg,
1858 sizeof(msg_to_upd), ret_len, strerror(err));
1859
1860 userlog("ERROR! Failed to write to msg file[%s]: req_len=%d, "
1861 "written=%d: %s",
1862 fname_msg, sizeof(msg_to_upd), ret_len, strerror(err));
1863
1864 ret = XAER_RMFAIL;
1865 goto out;
1866 }
1867 NDRX_FCLOSE(f);
1868 f = NULL;
1869
1870
1871 NDRX_LOG(log_info, "Removing update command file: [%s]", fname);
1872
1873 if (EXSUCCEED!=tmq_finalize_files_upd(&el->b.upd, fname, NULL,
1874 TMQ_FILECMD_UNLINK, el))
1875 {
1876 ret = XAER_RMFAIL;
1877 goto out;
1878 }
1879
1880 }
1881 else if (TMQ_STORCMD_DEL == el->b.hdr.command_code)
1882 {
1883 fname_msg = get_file_name_final(tmq_msgid_serialize(el->b.hdr.msgid,
1884 msgid_str));
1885
1886 NDRX_LOG(log_info, "Message file to remove: [%s]", fname_msg);
1887 NDRX_LOG(log_info, "Command file to remove: [%s]", fname);
1888
1889
1890
1891 if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname_msg, fname,
1892 TMQ_FILECMD_UNLINK, el))
1893 {
1894 ret = XAER_RMFAIL;
1895 goto out;
1896 }
1897 }
1898 else if (TMQ_STORCMD_DUM == el->b.hdr.command_code)
1899 {
1900 NDRX_LOG(log_info, "Dummy command to remove: [%s]", fname);
1901
1902
1903
1904 if (EXSUCCEED!=tmq_finalize_files_hdr(&el->b.hdr, fname, NULL,
1905 TMQ_FILECMD_UNLINK, el))
1906 {
1907 ret = XAER_RMFAIL;
1908 goto out;
1909 }
1910 }
1911 else
1912 {
1913 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - invalid command [%c]!",
1914 el->b.hdr.command_code);
1915
1916 ret = XAER_RMERR;
1917 goto out;
1918 }
1919
1920
1921 NDRX_LOG(log_debug, "Commit [%s] sequence OK", tmxid, el->seqno);
1922 DL_DELETE(p_tl->cmds, el);
1923 NDRX_FPFREE(el);
1924
1925 }
1926
1927
1928 tmq_remove_logfree(p_tl, EXTRUE);
1929 p_tl = NULL;
1930
1931 NDRX_LOG(log_info, "tmxid [%s] all sequences committed ok", tmxid);
1932
1933 out:
1934
1935 NDRX_LOG(log_info, "Commit returns %d", ret);
1936
1937
1938 if (NULL!=p_tl && !locke)
1939 {
1940 tmq_log_unlock(p_tl);
1941 }
1942
1943 return ret;
1944 }
1945
1946
1947
1948
1949
1950
1951 expublic int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
1952 {
1953 char *tmxid;
1954 int ret = XA_OK;
1955
1956 if (!G_atmi_tls->qdisk_is_open)
1957 {
1958 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - XA not open!");
1959 ret = XAER_RMERR;
1960 goto out;
1961 }
1962
1963 tmxid = set_filename_base(xid);
1964
1965 if (M_is_tmqueue)
1966 {
1967 ret = xa_commit_entry_tmq(tmxid, flags);
1968 }
1969 else
1970 {
1971 ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_COMMITRAN);
1972 }
1973
1974 out:
1975
1976 return ret;
1977 }
1978
1979 #define READ_BLOCK(BUF, LEN) do {\
1980 if (LEN!=(act_read=fread(BUF, 1, LEN, f)))\
1981 {\
1982 *err = ferror(f);\
1983 \
1984 if (EXSUCCEED==*err)\
1985 {\
1986 *tmq_err = TMQ_ERR_EOF;\
1987 }\
1988 NDRX_LOG(log_error, "ERROR! Failed to read tx file (%s: %s): req_read=%d, read=%d: %s, tmq_err: %d",\
1989 dbg_msg, fname, LEN, act_read, (*err==0?"EOF":strerror(*err)), *tmq_err);\
1990 \
1991 userlog("ERROR! Failed to read tx file (%s: %s): req_read=%d, read=%d: %s, tmq_err: %d",\
1992 dbg_msg, fname, LEN, act_read, (*err==0?"EOF":strerror(*err)), *tmq_err);\
1993 EXFAIL_OUT(ret);\
1994 }\
1995 } while (0)
1996
1997
1998 #define VERIFY_MAGIC(FIELD, MAGIC_VAL, LEN, DESCR, ERROR_CODE) do { \
1999 \
2000 if (0!=strncmp(FIELD, MAGIC_VAL, LEN))\
2001 {\
2002 NDRX_LOG(log_error, "ERROR! file: [%s] invalid %s: expected: [%s] got: [%.*s] command: [%x] error_code: %d", \
2003 fname, DESCR, MAGIC_VAL, LEN, FIELD, (unsigned int)p_hdr->command_code, ERROR_CODE);\
2004 NDRX_DUMP(log_error, "Invalid header", p_hdr, sizeof(tmq_cmdheader_t));\
2005 userlog("ERROR! file: [%s] invalid %s: expected: [%s] got: [%.*s] command: [%x] error_code: %d", \
2006 fname, DESCR, MAGIC_VAL, LEN, FIELD, (unsigned int)p_hdr->command_code, ERROR_CODE);\
2007 *tmq_err = ERROR_CODE;\
2008 EXFAIL_OUT(ret);\
2009 }\
2010 } while(0)
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025 exprivate int read_tx_block(FILE *f, char *block, int len, char *fname,
2026 char *dbg_msg, int *err,
2027 int *tmq_err)
2028 {
2029 int act_read;
2030 int read = 0;
2031 int ret = EXSUCCEED;
2032 int offset = 0;
2033 tmq_cmdheader_t *p_hdr;
2034
2035 *err=0;
2036 *tmq_err = 0;
2037
2038 if (0==ftell(f))
2039 {
2040 assert(len >= sizeof(tmq_cmdheader_t));
2041
2042 READ_BLOCK(block, sizeof(tmq_cmdheader_t));
2043
2044 read+=sizeof(tmq_cmdheader_t);
2045
2046
2047 p_hdr = (tmq_cmdheader_t *)block;
2048
2049
2050 VERIFY_MAGIC(p_hdr->magic, TMQ_MAGICBASE, TMQ_MAGICBASE_LEN, "base magic1", TMQ_ERR_CORRUPT);
2051 VERIFY_MAGIC(p_hdr->magic2, TMQ_MAGICBASE2, TMQ_MAGICBASE_LEN, "base magic2", TMQ_ERR_CORRUPT);
2052 VERIFY_MAGIC(p_hdr->magic, TMQ_MAGIC, TMQ_MAGIC_LEN, "magic1", TMQ_ERR_VERSION);
2053 VERIFY_MAGIC(p_hdr->magic2, TMQ_MAGIC2, TMQ_MAGIC_LEN, "magic2", TMQ_ERR_VERSION);
2054
2055
2056 switch (p_hdr->command_code)
2057 {
2058 case TMQ_STORCMD_NEWMSG:
2059 NDRX_LOG(log_debug, "Command: New message");
2060
2061 len = len - sizeof(tmq_cmdheader_t);
2062 break;
2063 case TMQ_STORCMD_UPD:
2064
2065 NDRX_LOG(log_debug, "Command: Update message");
2066 if (len > sizeof(tmq_cmdheader_t))
2067 {
2068 len = sizeof(tmq_msg_upd_t) - sizeof(tmq_cmdheader_t);
2069 }
2070 break;
2071 case TMQ_STORCMD_DEL:
2072
2073 NDRX_LOG(log_debug, "Command: Delete message");
2074
2075 if (len > sizeof(tmq_cmdheader_t))
2076 {
2077 len = sizeof(tmq_msg_del_t) - sizeof(tmq_cmdheader_t);
2078 }
2079 break;
2080 case TMQ_STORCMD_UNLOCK:
2081
2082 NDRX_LOG(log_debug, "Command: Unlock message");
2083
2084 if (len > sizeof(tmq_cmdheader_t))
2085 {
2086 len = sizeof(tmq_msg_unl_t) - sizeof(tmq_cmdheader_t);
2087 }
2088 break;
2089 case TMQ_STORCMD_DUM:
2090
2091 NDRX_LOG(log_debug, "Command: dummy message");
2092
2093 if (len > sizeof(tmq_cmdheader_t))
2094 {
2095 len = sizeof(tmq_msg_dum_t) - sizeof(tmq_cmdheader_t);
2096 }
2097 break;
2098 default:
2099 NDRX_LOG(log_error, "ERROR! file [%s] invalid command code [%x]",
2100 fname, (unsigned int)p_hdr->command_code);
2101 NDRX_DUMP(log_error, "Invalid header", p_hdr, sizeof(tmq_cmdheader_t));
2102 userlog("ERROR! file [%s] invalid command code [%x]",
2103 fname, (unsigned int)p_hdr->command_code);
2104 *tmq_err = TMQ_ERR_CORRUPT;
2105 EXFAIL_OUT(ret);
2106
2107 break;
2108 }
2109
2110 offset = sizeof(tmq_cmdheader_t);
2111 }
2112
2113 if (len > 0)
2114 {
2115 READ_BLOCK((block+offset), len);
2116
2117 read+=len;
2118 }
2119
2120
2121 out:
2122
2123 if (EXSUCCEED==ret)
2124 {
2125 return read;
2126 }
2127 else
2128 {
2129 return ret;
2130 }
2131 }
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142 exprivate int read_tx_from_file(char *fname, char *block, int len, int *err,
2143 int *tmq_err)
2144 {
2145 int ret = EXSUCCEED;
2146 FILE *f = NULL;
2147
2148 *err=0;
2149 if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(fname, "r+b")))
2150 {
2151 *err = errno;
2152 NDRX_LOG(log_error, "ERROR! xa_commit_entry() - failed to open file[%s]: %s!",
2153 fname, strerror(*err));
2154
2155 userlog( "ERROR! xa_commit_entry() - failed to open file[%s]: %s!",
2156 fname, strerror(*err));
2157 EXFAIL_OUT(ret);
2158 }
2159
2160 ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2161
2162 ret = read_tx_block(f, block, len, fname, "read_tx_from_file", err, tmq_err);
2163
2164 out:
2165
2166 if (NULL!=f)
2167 {
2168 NDRX_FCLOSE(f);
2169 }
2170
2171 return ret;
2172 }
2173
2174
2175 #define WRITE_TO_DISK(PTR, OFFSET, LEN) \
2176 ret_len = 0;\
2177 if (ndrx_G_systest_lockloss || G_atmi_env.test_qdisk_write_fail || LEN!=(ret_len=fwrite( ((char *)PTR) + OFFSET, 1, LEN, f)))\
2178 {\
2179 int err = errno;\
2180 \
2181 if (G_atmi_env.test_qdisk_write_fail)\
2182 {\
2183 NDRX_LOG(log_error, "test point: test_qdisk_write_fail TRUE");\
2184 err = ENOSPC;\
2185 }\
2186 NDRX_LOG(log_error, "ERROR! Failed to write to msgblock/tx file [%s]: "\
2187 "offset=%d req_len=%d, written=%d: %s",\
2188 G_atmi_tls->qdisk_tls->filename_active,\
2189 OFFSET, LEN, ret_len, strerror(err));\
2190 userlog("ERROR! Failed to write msgblock/tx file [%s]: "\
2191 "offset=%d req_len=%d, written=%d: %s",\
2192 G_atmi_tls->qdisk_tls->filename_active,\
2193 OFFSET, LEN, ret_len, strerror(err));\
2194 EXFAIL_OUT(ret);\
2195 }
2196
2197
2198 #define WRITE_FLUSH \
2199 ret_len = 0;\
2200 if (ndrx_G_systest_lockloss || EXSUCCEED!=fflush(f))\
2201 {\
2202 int err = errno;\
2203 NDRX_LOG(log_error, "ERROR! fflush() on [%s] failed: %s", \
2204 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2205 userlog("ERROR! fflush() on [%s] failed: %s", \
2206 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2207 EXFAIL_OUT(ret);\
2208 }
2209
2210 #define WRITE_REWIND \
2211 if (EXSUCCEED!=fseek(f, 0L, SEEK_SET))\
2212 {\
2213 int err = errno;\
2214 NDRX_LOG(log_error, "ERROR! fseek() rewind on [%s] failed: %s", \
2215 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2216 userlog("ERROR! fseek() rewind on [%s] failed: %s", \
2217 G_atmi_tls->qdisk_tls->filename_active, strerror(err));\
2218 EXFAIL_OUT(ret);\
2219 }
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230 exprivate int write_to_tx_file(char *block, int len, char *cust_tmxid, int *int_diag)
2231 {
2232 int ret = EXSUCCEED;
2233 XID xid;
2234 size_t ret_len;
2235 FILE *f = NULL;
2236 int ax_ret;
2237 char mode_str[16];
2238 tmq_cmdheader_t dum;
2239 int seqno;
2240 long xaflags=0;
2241 char *tmxid=NULL;
2242
2243 NDRX_STRCPY_SAFE(mode_str, "wb");
2244
2245 if (ndrx_get_G_atmi_env()->xa_sw->flags & TMREGISTER && !G_atmi_tls->qdisk_tls->is_reg)
2246 {
2247 ax_ret = ax_reg(G_atmi_tls->qdisk_rmid, &xid, 0);
2248
2249 if (TM_JOIN!=ax_ret && TM_OK!=ax_ret)
2250 {
2251 if (NULL!=int_diag)
2252 {
2253 *int_diag|=TMQ_INT_DIAG_EJOIN;
2254 }
2255
2256 NDRX_LOG(log_error, "ERROR! xa_reg() failed!");
2257 EXFAIL_OUT(ret);
2258 }
2259
2260 if (TM_JOIN==ax_ret)
2261 {
2262 xaflags = TMJOIN;
2263 }
2264
2265
2266 if (XA_OK!=xa_start_entry(ndrx_get_G_atmi_env()->xa_sw, &xid, G_atmi_tls->qdisk_rmid, xaflags))
2267 {
2268
2269 if (NULL!=int_diag)
2270 {
2271 *int_diag|=TMQ_INT_DIAG_EJOIN;
2272 }
2273
2274 NDRX_LOG(log_error, "ERROR! xa_start_entry() failed!");
2275 EXFAIL_OUT(ret);
2276 }
2277
2278 G_atmi_tls->qdisk_tls->is_reg = EXTRUE;
2279 }
2280
2281
2282
2283
2284 if (EXSUCCEED!=set_filenames(&seqno))
2285 {
2286 EXFAIL_OUT(ret);
2287 }
2288
2289 if (NULL!=cust_tmxid)
2290 {
2291 tmxid = cust_tmxid;
2292 }
2293 else
2294 {
2295
2296 tmxid = G_atmi_tls->qdisk_tls->filename_base;
2297 }
2298
2299
2300
2301 NDRX_LOG(log_info, "Writing command file: [%s] mode: [%s] (seqno: %d)",
2302 G_atmi_tls->qdisk_tls->filename_active, mode_str, seqno);
2303
2304
2305 if (ndrx_G_systest_lockloss || NULL==(f = NDRX_FOPEN(G_atmi_tls->qdisk_tls->filename_active, mode_str)))
2306 {
2307 int err = errno;
2308 NDRX_LOG(log_error, "ERROR! write_to_tx_file() - failed to open file[%s]: %s!",
2309 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2310
2311 userlog( "ERROR! write_to_tx_file() - failed to open file[%s]: %s!",
2312 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2313 EXFAIL_OUT(ret);
2314 }
2315
2316 ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2317
2318
2319
2320
2321 WRITE_TO_DISK(block, 0, len);
2322
2323 WRITE_FLUSH;
2324
2325
2326
2327
2328 if (ndrx_G_systest_lockloss || EXSUCCEED!=ndrx_fsync_fsync(f, G_atmi_env.xa_fsync_flags))
2329 {
2330 NDRX_LOG(log_error, "failed to fsync");
2331 EXFAIL_OUT(ret);
2332 }
2333
2334
2335
2336
2337 if (EXSUCCEED!=tmq_log_addcmd(tmxid, seqno, block, XA_RM_STATUS_ACTIVE))
2338 {
2339 NDRX_LOG(log_error, "Failed to add [%s] seqno %d to transaction log",
2340 G_atmi_tls->qdisk_tls->filename_base, seqno);
2341 userlog("Failed to add [%s] seqno %d to transaction log",
2342 G_atmi_tls->qdisk_tls->filename_base, seqno);
2343 EXFAIL_OUT(ret);
2344 }
2345
2346 out:
2347
2348 if (NULL!=f)
2349 {
2350
2351 if (EXSUCCEED!=ret)
2352 {
2353 NDRX_LOG(log_error, "Unlink: [%s]", G_atmi_tls->qdisk_tls->filename_active);
2354
2355
2356 if (ndrx_G_systest_lockloss
2357 || EXSUCCEED!=unlink(G_atmi_tls->qdisk_tls->filename_active))
2358 {
2359 int err = errno;
2360 NDRX_LOG(log_error, "Failed to unlink [%s]: %s",
2361 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2362 userlog("Failed to unlink [%s]: %s",
2363 G_atmi_tls->qdisk_tls->filename_active, strerror(err));
2364 }
2365 }
2366
2367 NDRX_FCLOSE(f);
2368 }
2369
2370
2371 if (EXSUCCEED!=ret && NULL!=tmxid)
2372 {
2373 tmq_log_set_abort_only(tmxid);
2374 }
2375
2376 return ret;
2377 }
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394 expublic int tmq_storage_write_cmd_newmsg(tmq_msg_t *msg, int *int_diag)
2395 {
2396 int ret = EXSUCCEED;
2397 char tmp[TMMSGIDLEN_STR+1];
2398 size_t len;
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408 len = tmq_get_block_len((char *)msg);
2409
2410 NDRX_DUMP(log_debug, "Writing new message to disk",
2411 (char *)msg, len);
2412
2413 if (EXSUCCEED!=write_to_tx_file((char *)msg, len, NULL, int_diag))
2414 {
2415 NDRX_LOG(log_error, "tmq_storage_write_cmd_newmsg() failed for msg %s",
2416 tmq_msgid_serialize(msg->hdr.msgid, tmp));
2417 EXFAIL_OUT(ret);
2418 }
2419
2420
2421
2422
2423 NDRX_LOG(log_info, "Message [%s] written ok to active TX file",
2424 tmq_msgid_serialize(msg->hdr.msgid, tmp));
2425
2426 out:
2427
2428 return ret;
2429 }
2430
2431
2432
2433
2434
2435
2436 expublic size_t tmq_get_block_len(char *data)
2437 {
2438 size_t ret = 0;
2439
2440 tmq_cmdheader_t *p_hdr = (tmq_cmdheader_t *)data;
2441 tmq_msg_t *p_msg;
2442 tmq_msg_del_t *p_del;
2443 tmq_msg_upd_t *p_upd;
2444 tmq_msg_unl_t *p_unl;
2445 tmq_msg_dum_t *p_dum;
2446
2447 switch (p_hdr->command_code)
2448 {
2449 case TMQ_STORCMD_NEWMSG:
2450 p_msg = (tmq_msg_t *)data;
2451 ret = sizeof(*p_msg) + p_msg->len;
2452 break;
2453 case TMQ_STORCMD_UPD:
2454 ret = sizeof(*p_upd);
2455 break;
2456 case TMQ_STORCMD_DEL:
2457 ret = sizeof(*p_del);
2458 break;
2459 case TMQ_STORCMD_UNLOCK:
2460 ret = sizeof(*p_unl);
2461 break;
2462 case TMQ_STORCMD_DUM:
2463 ret = sizeof(*p_dum);
2464 break;
2465 default:
2466 NDRX_LOG(log_error, "Unknown command code: %c", p_hdr->command_code);
2467 break;
2468 }
2469
2470 return ret;
2471 }
2472
2473
2474
2475
2476
2477
2478
2479
2480 expublic int tmq_storage_write_cmd_block(char *data, char *descr, char *cust_tmxid, int *int_diag)
2481 {
2482 int ret = EXSUCCEED;
2483 char msgid_str[TMMSGIDLEN_STR+1];
2484 size_t len;
2485 tmq_cmdheader_t *p_hdr = (tmq_cmdheader_t *)data;
2486
2487
2488 len = tmq_get_block_len((char *)data);
2489
2490 NDRX_LOG(log_info, "Writing command block: %s msg [%s]", descr,
2491 tmq_msgid_serialize(p_hdr->msgid, msgid_str));
2492
2493 NDRX_DUMP(log_debug, "Writing command block to disk",
2494 (char *)data, len);
2495
2496 if (EXSUCCEED!=write_to_tx_file((char *)data, len, cust_tmxid, int_diag))
2497 {
2498 NDRX_LOG(log_error, "tmq_storage_write_cmd_block() failed for msg %s",
2499 tmq_msgid_serialize(p_hdr->msgid, msgid_str));
2500 EXFAIL_OUT(ret);
2501 }
2502
2503 out:
2504 return ret;
2505
2506 }
2507
2508
2509
2510
2511
2512
2513
2514 exprivate int tmq_get_msgid_from_filename(char *filename_in, char *msgid_out)
2515 {
2516 int ret = EXSUCCEED;
2517
2518 tmq_msgid_deserialize(filename_in, msgid_out);
2519
2520 out:
2521 return ret;
2522 }
2523
2524
2525 #define DIRENT_CONTINUE \
2526 NDRX_FREE(namelist[n]);\
2527 continue;\
2528
2529
2530
2531
2532
2533
2534
2535 expublic int tmq_storage_get_blocks(int (*process_block)(char *tmxid,
2536 union tmq_block **p_block, int state, int seqno), short nodeid, short srvid)
2537 {
2538 int ret = EXSUCCEED;
2539 struct dirent **namelist = NULL;
2540 int n, seqno;
2541 int j;
2542 char *p;
2543 union tmq_block *p_block = NULL;
2544 FILE *f = NULL;
2545 char filename[PATH_MAX+1];
2546 char tmxid[PATH_MAX+1];
2547 int err, tmq_err;
2548 int read;
2549 int state;
2550 qtran_log_t *p_tl;
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561 char *folders[] = {M_folder_committed, M_folder_prepared, M_folder_active};
2562 short msg_nodeid, msg_srvid;
2563 char msgid[TMMSGIDLEN];
2564
2565 if (!G_atmi_tls->qdisk_is_open)
2566 {
2567 NDRX_LOG(log_error, "ERROR! tmq_storage_get_blocks() - XA not open!");
2568 return XAER_RMERR;
2569 }
2570
2571 for (j = 0; j < N_DIM(folders); j++)
2572 {
2573
2574 switch (j)
2575 {
2576 case 0:
2577
2578 state=TMQ_TXSTATE_COMMITTED;
2579 break;
2580 case 1:
2581 case 3:
2582
2583 state=TMQ_TXSTATE_PREPARED;
2584 break;
2585 case 2:
2586
2587 state=TMQ_TXSTATE_ACTIVE;
2588 break;
2589 }
2590
2591 n = scandir(folders[j], &namelist, 0, alphasort);
2592 if (n < 0)
2593 {
2594 NDRX_LOG(log_error, "Failed to scan q directory [%s]: %s",
2595 folders[j], strerror(errno));
2596 EXFAIL_OUT(ret);
2597 }
2598
2599 while (n--)
2600 {
2601 if (0==strcmp(namelist[n]->d_name, ".") ||
2602 0==strcmp(namelist[n]->d_name, ".."))
2603 {
2604 DIRENT_CONTINUE;
2605 }
2606
2607
2608 if (0==j)
2609 {
2610
2611 if (EXSUCCEED!=tmq_get_msgid_from_filename(namelist[n]->d_name, msgid))
2612 {
2613 EXFAIL_OUT(ret);
2614 }
2615
2616 tmq_msgid_get_info(msgid, &msg_nodeid, &msg_srvid);
2617
2618 NDRX_LOG(log_info, "our nodeid/srvid %hd/%hd msg: %hd/%hd",
2619 nodeid, srvid, msg_nodeid, msg_srvid);
2620
2621 if (nodeid!=msg_nodeid || srvid!=msg_srvid)
2622 {
2623 NDRX_LOG(log_warn, "our nodeid/srvid %hd/%hd msg: %hd/%hd - IGNORE",
2624 nodeid, srvid, msg_nodeid, msg_srvid);
2625 DIRENT_CONTINUE;
2626 }
2627 }
2628
2629 snprintf(filename, sizeof(filename), "%s/%s", folders[j],
2630 namelist[n]->d_name);
2631
2632
2633
2634
2635
2636
2637
2638
2639 NDRX_LOG(log_warn, "Loading [%s]", filename);
2640
2641
2642 if (NULL==(p_block = NDRX_MALLOC(sizeof(*p_block))))
2643 {
2644 NDRX_LOG(log_error, "Failed to alloc [%s]: %s",
2645 filename, strerror(errno));
2646 EXFAIL_OUT(ret);
2647 }
2648
2649
2650 if (j>0)
2651 {
2652 NDRX_STRCPY_SAFE(tmxid, namelist[n]->d_name);
2653
2654 p = strrchr(tmxid, '-');
2655
2656 if (NULL==p)
2657 {
2658 NDRX_LOG(log_error, "Invalid file name [%s] missing dash!",
2659 filename);
2660 userlog("Invalid file name [%s] missing dash!",
2661 filename);
2662 NDRX_FREE((char *)p_block);
2663 p_block=NULL;
2664 DIRENT_CONTINUE;
2665 }
2666
2667 *p=EXEOS;
2668 p++;
2669
2670 seqno = atoi(p);
2671
2672
2673 p_tl = tmq_log_start_or_get(tmxid);
2674
2675 if (NULL==p_tl)
2676 {
2677 NDRX_LOG(log_error, "Failed to get transaction object for [%s] seqno %d",
2678 tmxid, seqno);
2679 NDRX_FREE((char *)p_block);
2680 p_block=NULL;
2681 EXFAIL_OUT(ret);
2682 }
2683
2684 if (2==j)
2685 {
2686
2687 p_tl->is_abort_only=EXTRUE;
2688 p_tl->txstage=XA_TX_STAGE_ACTIVE;
2689 }
2690 else
2691 {
2692 p_tl->txstage=XA_TX_STAGE_PREPARED;
2693 }
2694
2695
2696 tmq_log_unlock(p_tl);
2697 }
2698
2699 if (j<2)
2700 {
2701 if (ndrx_G_systest_lockloss || NULL==(f=NDRX_FOPEN(filename, "rb")))
2702 {
2703 err = errno;
2704 NDRX_LOG(log_error, "Failed to open for read [%s]: %s",
2705 filename, strerror(err));
2706 userlog("Failed to open for read [%s]: %s",
2707 filename, strerror(err));
2708 NDRX_FREE((char *)p_block);
2709 p_block=NULL;
2710
2711 EXFAIL_OUT(ret);
2712 }
2713
2714 ndrx_fadvise_donotneed(fileno(f), TMQ_FSCACHE_LEN);
2715
2716
2717
2718
2719 if (EXFAIL==(read=read_tx_block(f, (char *)p_block, sizeof(*p_block),
2720 filename, "tmq_storage_get_blocks", &err, &tmq_err)))
2721 {
2722 NDRX_LOG(log_error, "ERROR! Failed to read [%s] hdr (%d bytes) tmqerr: %d: %s - cannot start, resolve manually",
2723 filename, sizeof(*p_block), tmq_err, (err==0?"EOF":strerror(err)) );
2724 userlog("ERROR! Failed to read [%s] hdr (%d bytes) tmqerr: %d: %s - cannot start, resolve manually",
2725 filename, sizeof(*p_block), tmq_err, (err==0?"EOF":strerror(err)) );
2726
2727
2728 NDRX_FCLOSE(f);
2729 f=NULL;
2730
2731 NDRX_FREE((char *)p_block);
2732 p_block = NULL;
2733
2734 EXFAIL_OUT(ret);
2735 }
2736
2737
2738
2739
2740
2741
2742
2743 if (nodeid!=p_block->hdr.nodeid || srvid!=p_block->hdr.srvid)
2744 {
2745 NDRX_LOG(log_warn, "our nodeid/srvid %hd/%hd msg: %hd/%hd - IGNORE",
2746 nodeid, srvid, p_block->hdr.nodeid, p_block->hdr.srvid);
2747
2748 NDRX_FREE((char *)p_block);
2749 p_block = NULL;
2750 NDRX_FCLOSE(f);
2751 f=NULL;
2752
2753 DIRENT_CONTINUE;
2754 }
2755
2756 NDRX_DUMP(log_debug, "Got command block", p_block, read);
2757
2758
2759 if (TMQ_STORCMD_NEWMSG==p_block->hdr.command_code)
2760 {
2761 int bytes_extra;
2762 int bytes_to_read;
2763 if (NULL==(p_block = NDRX_REALLOC(p_block, sizeof(tmq_msg_t) + p_block->msg.len)))
2764 {
2765 NDRX_LOG(log_error, "Failed to alloc [%d]: %s",
2766 (sizeof(tmq_msg_t) + p_block->msg.len), strerror(errno));
2767 EXFAIL_OUT(ret);
2768 }
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778 bytes_extra = sizeof(*p_block)-EXOFFSET(tmq_msg_t, msg);
2779 bytes_to_read = p_block->msg.len - bytes_extra;
2780
2781 NDRX_LOG(log_info, "bytes_extra=%d bytes_to_read=%d",
2782 bytes_extra, bytes_to_read);
2783
2784 if (bytes_to_read > 0)
2785 {
2786 if (EXFAIL==read_tx_block(f,
2787 p_block->msg.msg+bytes_extra,
2788 bytes_to_read, filename, "tmq_storage_get_blocks 2", &err, &tmq_err))
2789 {
2790 NDRX_LOG(log_error, "ERROR! Failed to read [%s] %d bytes: %s - cannot start, resolve manually",
2791 filename, bytes_to_read, strerror(err));
2792
2793 userlog("ERROR! Failed to read [%s] %d bytes: %s - cannot start, resolve manually",
2794 filename, bytes_to_read, strerror(err));
2795
2796 NDRX_FCLOSE(f);
2797 f=NULL;
2798 NDRX_FREE((char *)p_block);
2799 p_block = NULL;
2800
2801 EXFAIL_OUT(ret);
2802 }
2803 }
2804 else
2805 {
2806 NDRX_LOG(log_info, "Full message already read by command block!");
2807 }
2808
2809
2810 if (0!=j)
2811 {
2812
2813 p_block->msg.lockthreadid = ndrx_gettid();
2814 }
2815 else
2816 {
2817 p_block->msg.lockthreadid = 0;
2818 }
2819
2820 NDRX_DUMP(6, "Read message from disk",
2821 p_block->msg.msg, p_block->msg.len);
2822 }
2823
2824 NDRX_FCLOSE(f);
2825 f=NULL;
2826 }
2827 else
2828 {
2829
2830 M_p_tmq_setup_cmdheader_dum(&p_block->hdr, NULL, nodeid,
2831 0, ndrx_G_qspace, 0);
2832 p_block->hdr.command_code = TMQ_STORCMD_DUM;
2833 }
2834
2835
2836
2837
2838 if (EXSUCCEED!=process_block(tmxid, &p_block, state, seqno))
2839 {
2840 NDRX_LOG(log_error, "Failed to process block!");
2841 EXFAIL_OUT(ret);
2842 }
2843
2844 NDRX_FREE(namelist[n]);
2845 }
2846 NDRX_FREE(namelist);
2847 namelist = NULL;
2848 }
2849
2850 out:
2851
2852
2853 if (NULL!=namelist)
2854 {
2855 dirent_free(namelist, n);
2856 namelist=NULL;
2857 }
2858
2859 if (NULL!=p_block)
2860 {
2861 NDRX_FREE((char *)p_block);
2862 }
2863
2864
2865 if (NULL!=f)
2866 {
2867 NDRX_FCLOSE(f);
2868 }
2869 return ret;
2870 }
2871
2872
2873
2874
2875
2876 exprivate void dirent_free(struct dirent **namelist, int n)
2877 {
2878 while (n>=0)
2879 {
2880 NDRX_FREE(namelist[n]);
2881 n--;
2882 }
2883 NDRX_FREE(namelist);
2884 }
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895 expublic int tmq_check_prepared(char *tmxid, char *fname)
2896 {
2897 int ret = EXSUCCEED;
2898 char tmp[PATH_MAX+1];
2899
2900
2901
2902 if (XA_OK!=(ret=ndrx_xa_qminicall(tmxid, TMQ_CMD_CHK_MEMLOG)))
2903 {
2904 snprintf(tmp, sizeof(tmp), "%s/%s", M_folder_prepared, fname);
2905
2906 if (ndrx_file_exists(tmp))
2907 {
2908
2909 ndrx_xa_qminicall(tmxid, TMQ_CMD_CHK_MEMLOG2);
2910
2911 NDRX_LOG(log_error, "Storage integrity problem. File [%s] exists, "
2912 "but transaction not - XAER_RMFAIL (tmqueue server will reboot)!",
2913 tmp);
2914 userlog("Integrity problem. File [%s] exists, "
2915 "but transaction not - XAER_RMFAIL (tmqueue server will reboot)!",
2916 tmp);
2917
2918 EXFAIL_OUT(ret);
2919 }
2920 else
2921 {
2922 NDRX_LOG(log_debug, "File [%s] does not exists any more", tmp);
2923 }
2924 }
2925 out:
2926 return ret;
2927 }
2928
2929
2930 #define RECOVER_CONTINUE \
2931 NDRX_FREE(G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]);\
2932 continue;\
2933
2934
2935 #define RECOVER_CLOSE_CURSOR \
2936 \
2937 if (NULL!=G_atmi_tls->qdisk_tls->recover_namelist)\
2938 {\
2939 dirent_free(G_atmi_tls->qdisk_tls->recover_namelist, G_atmi_tls->qdisk_tls->recover_i);\
2940 G_atmi_tls->qdisk_tls->recover_namelist = NULL;\
2941 }\
2942 G_atmi_tls->qdisk_tls->recover_open=EXFALSE;\
2943 G_atmi_tls->qdisk_tls->recover_i=EXFAIL;\
2944 G_atmi_tls->qdisk_tls->recover_last_loaded=EXFALSE;\
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960 expublic int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags)
2961 {
2962 int ret = XA_OK;
2963 int err;
2964 XID xtmp;
2965 char *p, *fname;
2966 char fname_full[PATH_MAX+1];
2967 int current_unload_pos=0;
2968
2969 if (!G_atmi_tls->qdisk_is_open)
2970 {
2971 NDRX_LOG(log_error, "ERROR! xa_recover_entry() - XA not open!");
2972 ret=XAER_PROTO;
2973 goto out;
2974 }
2975
2976 if (NULL==xid && count >0)
2977 {
2978 NDRX_LOG(log_error, "ERROR: xid is NULL and count >0");
2979 ret=XAER_INVAL;
2980 goto out;
2981 }
2982
2983 if (!G_atmi_tls->qdisk_tls->recover_open && ! (flags & TMSTARTRSCAN))
2984 {
2985 NDRX_LOG(log_error, "ERROR: Scan not open and TMSTARTRSCAN not specified");
2986 ret=XAER_INVAL;
2987 goto out;
2988 }
2989
2990
2991 if (flags & TMSTARTRSCAN)
2992 {
2993
2994 RECOVER_CLOSE_CURSOR;
2995 }
2996
2997
2998 if (flags & TMSTARTRSCAN)
2999 {
3000 G_atmi_tls->qdisk_tls->recover_i = scandir(M_folder_prepared,
3001 &G_atmi_tls->qdisk_tls->recover_namelist, 0, alphasort);
3002
3003 if (G_atmi_tls->qdisk_tls->recover_i < 0)
3004 {
3005 err=errno;
3006 NDRX_LOG(log_error, "Failed to scan q directory [%s]: %s",
3007 M_folder_prepared, strerror(err));
3008 userlog("Failed to scan q directory [%s]: %s",
3009 M_folder_prepared, strerror(err));
3010 ret=XAER_RMERR;
3011 goto out;
3012 }
3013
3014 G_atmi_tls->qdisk_tls->recover_open=EXTRUE;
3015 }
3016
3017
3018 if (NULL==G_atmi_tls->qdisk_tls->recover_namelist)
3019 {
3020 ret=0;
3021 goto out;
3022 }
3023
3024
3025 while ((count - current_unload_pos) > 0 &&
3026 G_atmi_tls->qdisk_tls->recover_i--)
3027 {
3028 fname = G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]->d_name;
3029
3030 if (0==strcmp(fname, ".") ||
3031 0==strcmp(fname, ".."))
3032 {
3033 RECOVER_CONTINUE;
3034 }
3035
3036 p = strchr(fname, '-');
3037
3038
3039 if (NULL==p)
3040 {
3041 NDRX_LOG(log_error, "Invalid prepared name [%s] - skip", fname);
3042 RECOVER_CONTINUE;
3043 }
3044
3045 NDRX_STRCPY_SAFE(fname_full, fname);
3046
3047
3048 *p = EXEOS;
3049 p++;
3050
3051 if (NULL==atmi_xa_deserialize_xid((unsigned char *)fname, &xtmp))
3052 {
3053 NDRX_LOG(log_error, "Failed to deserialize xid: %s - skip", fname);
3054 RECOVER_CONTINUE;
3055 }
3056
3057
3058
3059
3060
3061
3062
3063
3064 if ( (current_unload_pos>0 &&
3065 0==memcmp(&xid[current_unload_pos-1], &xtmp, sizeof(XID)))
3066
3067
3068 || (G_atmi_tls->qdisk_tls->recover_last_loaded
3069 && 0==memcmp(&G_atmi_tls->qdisk_tls->recover_last, &xtmp, sizeof(XID)))
3070 )
3071 {
3072 NDRX_LOG(log_debug, "Got part [%s] of xid [%s]", p, fname);
3073 RECOVER_CONTINUE;
3074 }
3075
3076
3077 memcpy(&xid[current_unload_pos], &xtmp, sizeof(XID));
3078
3079
3080
3081
3082
3083
3084
3085
3086 #if 0
3087 if (EXSUCCEED!=tmq_check_prepared(fname, fname_full))
3088 {
3089
3090 NDRX_FREE(G_atmi_tls->qdisk_tls->recover_namelist[G_atmi_tls->qdisk_tls->recover_i]);
3091 ret=XAER_RMFAIL;
3092 goto out;
3093 }
3094 #endif
3095
3096 NDRX_LOG(log_debug, "Xid [%s] unload to position %d", fname, current_unload_pos);
3097 ret++;
3098 current_unload_pos++;
3099 RECOVER_CONTINUE;
3100 }
3101
3102 if (ret>0)
3103 {
3104
3105 memcpy(&G_atmi_tls->qdisk_tls->recover_last, &xid[ret-1], sizeof(XID));
3106 G_atmi_tls->qdisk_tls->recover_last_loaded=EXTRUE;
3107 }
3108
3109 out:
3110
3111
3112 NDRX_LOG(log_debug, "recover: count=%ld, ret=%d", count, ret);
3113
3114 if ( ret>=0
3115 && ( (flags & TMENDRSCAN) || ret < count)
3116 && G_atmi_tls->qdisk_tls->recover_open
3117 )
3118 {
3119 NDRX_LOG(log_debug, "recover: closing cursor");
3120
3121
3122 RECOVER_CLOSE_CURSOR;
3123 }
3124
3125 return ret;
3126 }
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136 expublic int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
3137 {
3138
3139 if (!G_atmi_tls->qdisk_is_open)
3140 {
3141 NDRX_LOG(log_error, "ERROR! xa_forget_entry() - XA not open!");
3142 return XAER_RMERR;
3143 }
3144
3145 NDRX_LOG(log_error, "ERROR! xa_forget_entry() - not using!!");
3146 return XAER_RMERR;
3147 }
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
3158 expublic int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags)
3159 {
3160 if (!G_atmi_tls->qdisk_is_open)
3161 {
3162 NDRX_LOG(log_error, "ERROR! xa_complete_entry() - XA not open!");
3163 return XAER_RMERR;
3164 }
3165
3166 NDRX_LOG(log_error, "ERROR! xa_complete_entry() - not using!!");
3167 return XAER_RMERR;
3168 }
3169
3170
3171
3172 expublic int xa_open_entry_stat( char *xa_info, int rmid, long flags)
3173 {
3174 return xa_open_entry(&ndrxqstatsw, xa_info, rmid, flags);
3175 }
3176 expublic int xa_close_entry_stat(char *xa_info, int rmid, long flags)
3177 {
3178 return xa_close_entry(&ndrxqstatsw, xa_info, rmid, flags);
3179 }
3180 expublic int xa_start_entry_stat(XID *xid, int rmid, long flags)
3181 {
3182 return xa_start_entry(&ndrxqstatsw, xid, rmid, flags);
3183 }
3184 expublic int xa_end_entry_stat(XID *xid, int rmid, long flags)
3185 {
3186 return xa_end_entry(&ndrxqstatsw, xid, rmid, flags);
3187 }
3188 expublic int xa_rollback_entry_stat(XID *xid, int rmid, long flags)
3189 {
3190 return xa_rollback_entry(&ndrxqstatsw, xid, rmid, flags);
3191 }
3192 expublic int xa_prepare_entry_stat(XID *xid, int rmid, long flags)
3193 {
3194 return xa_prepare_entry(&ndrxqstatsw, xid, rmid, flags);
3195 }
3196 expublic int xa_commit_entry_stat(XID *xid, int rmid, long flags)
3197 {
3198 return xa_commit_entry(&ndrxqstatsw, xid, rmid, flags);
3199 }
3200 expublic int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags)
3201 {
3202 return xa_recover_entry(&ndrxqstatsw, xid, count, rmid, flags);
3203 }
3204 expublic int xa_forget_entry_stat(XID *xid, int rmid, long flags)
3205 {
3206 return xa_forget_entry(&ndrxqstatsw, xid, rmid, flags);
3207 }
3208 expublic int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags)
3209 {
3210 return xa_complete_entry(&ndrxqstatsw, handle, retval, rmid, flags);
3211 }
3212
3213
3214 expublic int xa_open_entry_dyn( char *xa_info, int rmid, long flags)
3215 {
3216 return xa_open_entry(&ndrxqdynsw, xa_info, rmid, flags);
3217 }
3218 expublic int xa_close_entry_dyn(char *xa_info, int rmid, long flags)
3219 {
3220 return xa_close_entry(&ndrxqdynsw, xa_info, rmid, flags);
3221 }
3222 expublic int xa_start_entry_dyn(XID *xid, int rmid, long flags)
3223 {
3224 return xa_start_entry(&ndrxqdynsw, xid, rmid, flags);
3225 }
3226 expublic int xa_end_entry_dyn(XID *xid, int rmid, long flags)
3227 {
3228 return xa_end_entry(&ndrxqdynsw, xid, rmid, flags);
3229 }
3230 expublic int xa_rollback_entry_dyn(XID *xid, int rmid, long flags)
3231 {
3232 return xa_rollback_entry(&ndrxqdynsw, xid, rmid, flags);
3233 }
3234 expublic int xa_prepare_entry_dyn(XID *xid, int rmid, long flags)
3235 {
3236 return xa_prepare_entry(&ndrxqdynsw, xid, rmid, flags);
3237 }
3238 expublic int xa_commit_entry_dyn(XID *xid, int rmid, long flags)
3239 {
3240 return xa_commit_entry(&ndrxqdynsw, xid, rmid, flags);
3241 }
3242 expublic int xa_recover_entry_dyn(XID *xid, long count, int rmid, long flags)
3243 {
3244 return xa_recover_entry(&ndrxqdynsw, xid, count, rmid, flags);
3245 }
3246 expublic int xa_forget_entry_dyn(XID *xid, int rmid, long flags)
3247 {
3248 return xa_forget_entry(&ndrxqdynsw, xid, rmid, flags);
3249 }
3250 expublic int xa_complete_entry_dyn(int *handle, int *retval, int rmid, long flags)
3251 {
3252 return xa_complete_entry(&ndrxqdynsw, handle, retval, rmid, flags);
3253 }
3254
3255
3256
3257
3258 expublic void tmq_chkdisk_stopwatch_reset(void)
3259 {
3260 MUTEX_LOCK_V(M_chkdisk_stopwatch_lock);
3261 ndrx_stopwatch_reset(&M_chkdisk_stopwatch);
3262 MUTEX_UNLOCK_V(M_chkdisk_stopwatch_lock);
3263
3264 }
3265
3266
3267
3268
3269
3270 expublic long tmq_chkdisk_stopwatch_get_delta_sec(void)
3271 {
3272 long ret;
3273
3274 MUTEX_LOCK_V(M_chkdisk_stopwatch_lock);
3275 ret=ndrx_stopwatch_get_delta_sec(&M_chkdisk_stopwatch);
3276 MUTEX_UNLOCK_V(M_chkdisk_stopwatch_lock);
3277
3278 return ret;
3279 }
3280
3281
3282
3283
3284
3285
3286 exprivate void tmq_chkdisk_th(void *ptr, int *p_finish_off)
3287 {
3288 static int volatile into_func = EXFALSE;
3289 int *p_chkdisk_time = (int *)ptr;
3290
3291 char tmp[PATH_MAX+1];
3292 int i;
3293 int ret=EXSUCCEED;
3294 DIR *dir=NULL;
3295 struct dirent *entry;
3296 int len;
3297 char tranmask[256];
3298
3299 if (EXTRUE==into_func)
3300 {
3301
3302 return;
3303 }
3304
3305
3306
3307
3308 if ( tmq_chkdisk_stopwatch_get_delta_sec() <= *p_chkdisk_time)
3309 {
3310
3311 return;
3312 }
3313
3314 dir = opendir(M_folder_committed);
3315
3316 if (dir == NULL) {
3317
3318 NDRX_LOG(log_error, "opendir [%s] failed: %s",
3319 M_folder_committed, strerror(errno));
3320 EXFAIL_OUT(ret);
3321 }
3322
3323 while ((entry = readdir(dir)) != NULL)
3324 {
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334 if (entry->d_name[0] == '.')
3335 {
3336 continue;
3337 }
3338
3339
3340 if (!M_p_tmq_msgid_exists(entry->d_name))
3341 {
3342 snprintf(tmp, sizeof(tmp), "%s/%s",
3343 M_folder_committed, entry->d_name);
3344
3345 if (ndrx_file_exists(tmp))
3346 {
3347 NDRX_LOG(log_error, "ERROR: Unkown message file "
3348 "exists [%s] (duplicate processes or two Q "
3349 "spaces works in the same directory) - restarting...",
3350 tmp);
3351 userlog("ERROR: Unkown message file exists"
3352 " [%s] (duplicate processes or two Q "
3353 "spaces works in the same directory) - restarting...",
3354 tmp);
3355 M_p_tpexit();
3356 break;
3357 }
3358 }
3359 }
3360
3361 out:
3362 if (NULL!=dir)
3363 {
3364 closedir(dir);
3365 }
3366
3367 }
3368
3369
3370