0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043 #include <stdarg.h>
0044
0045 #include <ndebug.h>
0046 #include <atmi.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <nstdutil.h>
0053
0054 #include "userlog.h"
0055 #include <xa_cmn.h>
0056 #include <exhash.h>
0057 #include <unistd.h>
0058 #include <Exfields.h>
0059 #include "qtran.h"
0060 #include "qcommon.h"
0061
0062
0063 #define CHK_THREAD_ACCESS if (ndrx_gettid()!=p_tl->lockthreadid)\
0064 {\
0065 NDRX_LOG(log_error, "Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0066 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0067 userlog("Transaction [%s] not locked for thread %" PRIu64 ", but for %" PRIu64,\
0068 p_tl->tmxid, ndrx_gettid(), p_tl->lockthreadid);\
0069 return EXFAIL;\
0070 }
0071
0072
0073
0074 exprivate qtran_log_t *M_qtran_hash = NULL;
0075 exprivate MUTEX_LOCKDECL(M_qtran_hash_lock);
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085 expublic int tmq_log_next(qtran_log_t *p_tl)
0086 {
0087 p_tl->seqno++;
0088
0089 return p_tl->seqno;
0090 }
0091
0092
0093
0094
0095
0096
0097 expublic int tmq_log_unlock(qtran_log_t *p_tl)
0098 {
0099 CHK_THREAD_ACCESS;
0100
0101 NDRX_LOG(log_info, "Transaction [%s] unlocked by thread %" PRIu64, p_tl->tmxid,
0102 p_tl->lockthreadid);
0103
0104 MUTEX_LOCK_V(M_qtran_hash_lock);
0105 p_tl->lockthreadid = 0;
0106 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0107
0108 return EXSUCCEED;
0109 }
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119 expublic int tmq_log_abortall(void)
0120 {
0121 qtran_log_t *el, *elt;
0122 int ret = EXSUCCEED;
0123 XID xid;
0124
0125 EXHASH_ITER(hh, M_qtran_hash, el, elt)
0126 {
0127 if (el->is_abort_only)
0128 {
0129 NDRX_LOG(log_error, "Aborting active transaction tmxid [%s]", el->tmxid);
0130
0131 if (NULL==atmi_xa_deserialize_xid((unsigned char *)el->tmxid, &xid))
0132 {
0133 NDRX_LOG(log_error, "Failed to deserialize tmxid [%s]", el->tmxid);
0134 EXFAIL_OUT(ret);
0135 }
0136
0137
0138 if (EXSUCCEED!=atmi_xa_rollback_entry(&xid, 0))
0139 {
0140 NDRX_LOG(log_error, "Failed to abort [%s]", el->tmxid);
0141 EXFAIL_OUT(ret);
0142 }
0143 }
0144 }
0145 out:
0146 return ret;
0147
0148 }
0149
0150
0151
0152
0153
0154
0155 expublic int tmq_log_exists_entry(char *tmxid)
0156 {
0157 int ret = EXFALSE;
0158 qtran_log_t *r = NULL;
0159
0160 MUTEX_LOCK_V(M_qtran_hash_lock);
0161 EXHASH_FIND_STR( M_qtran_hash, tmxid, r);
0162 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0163
0164 if (NULL!=r)
0165 {
0166 ret = EXTRUE;
0167 }
0168
0169 return ret;
0170 }
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188 expublic qtran_log_t * tmq_log_get_entry(char *tmxid, int dowait, int *locke)
0189 {
0190 qtran_log_t *r = NULL;
0191 ndrx_stopwatch_t w;
0192
0193 if (dowait)
0194 {
0195 ndrx_stopwatch_reset(&w);
0196 }
0197
0198 if (NULL!=locke)
0199 {
0200 *locke=EXFALSE;
0201 }
0202
0203 restart:
0204 MUTEX_LOCK_V(M_qtran_hash_lock);
0205 EXHASH_FIND_STR( M_qtran_hash, tmxid, r);
0206
0207 if (NULL!=r)
0208 {
0209 if (r->lockthreadid && r->lockthreadid!=ndrx_gettid())
0210 {
0211 if (dowait && ndrx_stopwatch_get_delta(&w) < dowait)
0212 {
0213 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0214
0215 usleep(100000);
0216 goto restart;
0217
0218 }
0219
0220 NDRX_LOG(log_error, "Q Transaction [%s] already locked for thread_id: %"
0221 PRIu64 " lock time: %d msec",
0222 tmxid, r->lockthreadid, dowait);
0223
0224 userlog("tmqueue: Transaction [%s] already locked for thread_id: %" PRIu64
0225 "lock time: %d msec",
0226 tmxid, r->lockthreadid, dowait);
0227 r = NULL;
0228
0229
0230 if (NULL!=locke)
0231 {
0232 *locke=EXTRUE;
0233 }
0234
0235 }
0236 else if (r->lockthreadid)
0237 {
0238 NDRX_LOG(log_info, "Transaction [%s] sub-locked for thread_id: %" PRIu64,
0239 tmxid, r->lockthreadid);
0240
0241
0242
0243
0244 if (NULL!=locke)
0245 {
0246 *locke=EXTRUE;
0247 }
0248 }
0249 else
0250 {
0251 r->lockthreadid = ndrx_gettid();
0252 NDRX_LOG(log_debug, "Transaction [%s] locked for thread_id: %" PRIu64,
0253 tmxid, r->lockthreadid);
0254 }
0255 }
0256
0257 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0258
0259 return r;
0260 }
0261
0262
0263
0264
0265
0266
0267
0268
0269 expublic int tmq_log_start(char *tmxid)
0270 {
0271 int ret = EXSUCCEED;
0272 int hash_added = EXFALSE;
0273 qtran_log_t *tmp = NULL;
0274
0275
0276 if (NULL==(tmp = NDRX_FPMALLOC(sizeof(qtran_log_t), 0)))
0277 {
0278 NDRX_LOG(log_error, "NDRX_CALLOC() failed: %s", strerror(errno));
0279 ret=EXFAIL;
0280 goto out;
0281 }
0282
0283
0284 memset(tmp, 0, sizeof(*tmp));
0285
0286 tmp->txstage = XA_TX_STAGE_ACTIVE;
0287 tmp->t_start = ndrx_utc_tstamp();
0288 tmp->t_update = ndrx_utc_tstamp();
0289 NDRX_STRCPY_SAFE(tmp->tmxid, tmxid);
0290 ndrx_stopwatch_reset(&tmp->ttimer);
0291
0292
0293 tmq_log_setseq(tmp);
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303 tmp->lockthreadid = ndrx_gettid();
0304
0305 MUTEX_LOCK_V(M_qtran_hash_lock);
0306 EXHASH_ADD_STR( M_qtran_hash, tmxid, tmp);
0307 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0308
0309 hash_added = EXTRUE;
0310
0311 out:
0312
0313
0314 if (EXSUCCEED==ret && NULL!=tmp)
0315 {
0316 tmq_log_unlock(tmp);
0317 }
0318
0319 return ret;
0320 }
0321
0322
0323
0324
0325
0326
0327 expublic qtran_log_t * tmq_log_start_or_get(char *tmxid)
0328 {
0329 int locke;
0330
0331 qtran_log_t * ret = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0332
0333 if (NULL==ret)
0334 {
0335 if (locke)
0336 {
0337 ret=NULL;
0338 }
0339 else if (EXSUCCEED!=tmq_log_start(tmxid))
0340 {
0341 ret=NULL;
0342 }
0343 else
0344 {
0345 ret = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0346 }
0347 }
0348
0349 return ret;
0350 }
0351
0352
0353
0354
0355
0356 expublic void tmq_log_set_abort_only(char *tmxid)
0357 {
0358 int locke=EXFALSE;
0359 qtran_log_t * p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke);
0360
0361 if (NULL!=p_tl)
0362 {
0363 NDRX_LOG(log_error, "Marking [%s] Q tran as abort only", tmxid);
0364 p_tl->is_abort_only=EXTRUE;
0365 }
0366
0367 if (NULL!=p_tl && !locke)
0368 {
0369
0370 tmq_log_unlock(p_tl);
0371 }
0372 }
0373
0374
0375
0376
0377
0378
0379
0380
0381
0382
0383
0384 expublic int tmq_log_addcmd(char *tmxid, int seqno, char *b, char entry_status)
0385 {
0386 int ret = EXSUCCEED;
0387 qtran_log_t *p_tl= NULL;
0388 qtran_log_cmd_t *cmd=NULL;
0389 size_t len;
0390 int locke;
0391 tmq_cmdheader_t *p_hdr=(tmq_cmdheader_t *)b;
0392
0393 NDRX_LOG(log_info, "Adding Q tran cmd: [%s] seqno: %d, "
0394 "command_code: %c, status: %c",
0395 tmxid, seqno, p_hdr->command_code, entry_status);
0396
0397 if (NULL==(p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, &locke)))
0398 {
0399 NDRX_LOG(log_error, "No Q transaction/lock timeout under xid_str: [%s]",
0400 tmxid);
0401 ret=EXFAIL;
0402 goto out_nolock;
0403 }
0404
0405
0406 if (NULL==(cmd = NDRX_FPMALLOC(sizeof(qtran_log_cmd_t), 0)))
0407 {
0408 NDRX_LOG(log_error, "Failed to fpmalloc %d bytes: %s",
0409 sizeof(qtran_log_cmd_t), strerror(errno));
0410 userlog("Failed to fpmalloc %d bytes: %s",
0411 sizeof(qtran_log_cmd_t), strerror(errno));
0412 EXFAIL_OUT(ret);
0413 }
0414
0415
0416 ndrx_stopwatch_reset(&p_tl->ttimer);
0417
0418 memset(cmd, 0, sizeof(*cmd));
0419
0420 cmd->seqno=seqno;
0421 cmd->cmd_status = entry_status;
0422 cmd->command_code = p_hdr->command_code;
0423
0424
0425 if (p_tl->seqno<seqno)
0426 {
0427 p_tl->seqno=seqno;
0428 }
0429
0430
0431
0432
0433
0434 if (TMQ_STORCMD_UPD==p_hdr->command_code)
0435 {
0436 len = sizeof(tmq_msg_upd_t);
0437 }
0438 else
0439 {
0440 len = sizeof(tmq_cmdheader_t);
0441 }
0442
0443
0444 memcpy(&cmd->b, b, len);
0445
0446 DL_APPEND(p_tl->cmds, cmd);
0447
0448 out:
0449
0450 if (NULL!=p_tl && !locke)
0451 {
0452 tmq_log_unlock(p_tl);
0453 }
0454
0455 out_nolock:
0456
0457 return ret;
0458 }
0459
0460
0461
0462
0463
0464
0465 expublic void tmq_remove_logfree(qtran_log_t *p_tl, int hash_rm)
0466 {
0467 if (hash_rm)
0468 {
0469 MUTEX_LOCK_V(M_qtran_hash_lock);
0470 EXHASH_DEL(M_qtran_hash, p_tl);
0471 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0472 }
0473
0474 NDRX_FPFREE(p_tl);
0475 }
0476
0477
0478
0479
0480
0481
0482
0483
0484
0485
0486 expublic qtran_log_list_t* tmq_copy_hash2list(int copy_mode)
0487 {
0488 qtran_log_list_t *ret = NULL;
0489 qtran_log_t * r, *rt;
0490 qtran_log_list_t *tmp;
0491
0492 if (copy_mode & COPY_MODE_ACQLOCK)
0493 {
0494 MUTEX_LOCK_V(M_qtran_hash_lock);
0495 }
0496
0497
0498
0499 EXHASH_ITER(hh, M_qtran_hash, r, rt)
0500 {
0501
0502 if (r->is_background && !(copy_mode & COPY_MODE_BACKGROUND))
0503 continue;
0504
0505 if (!r->is_background && !(copy_mode & COPY_MODE_FOREGROUND))
0506 continue;
0507
0508 if (NULL==(tmp = NDRX_FPMALLOC(sizeof(qtran_log_list_t), 0)))
0509 {
0510 NDRX_LOG(log_error, "Failed to fpmalloc %d: %s",
0511 sizeof(qtran_log_list_t), strerror(errno));
0512 goto out;
0513 }
0514
0515
0516
0517
0518
0519
0520 memcpy(&tmp->p_tl, r, sizeof(*r));
0521
0522 LL_APPEND(ret, tmp);
0523 }
0524
0525 out:
0526 if (copy_mode & COPY_MODE_ACQLOCK)
0527 {
0528 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0529 }
0530
0531 return ret;
0532 }
0533
0534
0535
0536
0537 expublic void tmq_tx_hash_lock(void)
0538 {
0539 MUTEX_LOCK_V(M_qtran_hash_lock);
0540 }
0541
0542
0543
0544
0545 expublic void tmq_tx_hash_unlock(void)
0546 {
0547 MUTEX_UNLOCK_V(M_qtran_hash_lock);
0548 }
0549
0550
0551
0552
0553
0554
0555 expublic int tmq_log_setseq(qtran_log_t *p_tl)
0556 {
0557 int ret= EXSUCCEED;
0558 long grp_flags=0;
0559
0560
0561
0562 if (G_atmi_env.procgrp_no)
0563 {
0564 p_tl->sg_sequence=tpsgislocked(G_atmi_env.procgrp_no
0565 , TPPG_SGVERIFY|TPPG_NONSGSUCC
0566 , &grp_flags);
0567
0568 if (EXFAIL==p_tl->sg_sequence)
0569 {
0570 NDRX_LOG(log_error, "tpsgislocked failed %s", tpstrerror(tperrno));
0571 EXFAIL_OUT(ret);
0572 }
0573
0574 if (grp_flags & TPPG_SINGLETON && p_tl->sg_sequence<=0)
0575 {
0576 NDRX_LOG(log_error, "Singleton group %d lock lost (at start) - exit(-1)",
0577 G_atmi_env.procgrp_no);
0578 userlog("Singleton group %d lock lost (at start) - exit(-1)",
0579 G_atmi_env.procgrp_no);
0580
0581 exit(EXFAIL);
0582 }
0583 }
0584
0585 out:
0586 return ret;
0587 }
0588
0589
0590
0591
0592
0593 expublic int tmq_log_checkpointseq(qtran_log_t *p_tl)
0594 {
0595 int ret=EXSUCCEED;
0596 long seq;
0597 long grp_flags=0;
0598
0599
0600
0601 if (G_atmi_env.procgrp_no)
0602 {
0603 seq=tpsgislocked(G_atmi_env.procgrp_no, TPPG_SGVERIFY|TPPG_NONSGSUCC, &grp_flags);
0604
0605 if (seq < 0)
0606 {
0607 NDRX_LOG(log_error, "tpsgislocked returns %s", tpstrerror(tperrno));
0608 EXFAIL_OUT(ret);
0609 }
0610
0611 if ((grp_flags & TPPG_SINGLETON) && 0==seq)
0612 {
0613 NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost - exit(-1)",
0614 G_atmi_env.procgrp_no, tpgetnodeid());
0615 userlog("Singleton group %d on node %ld lock lost - exit(-1)",
0616 G_atmi_env.procgrp_no, tpgetnodeid());
0617
0618 exit(EXFAIL);
0619 }
0620
0621
0622
0623
0624 if ( (grp_flags & TPPG_SINGLETON) && (seq - p_tl->sg_sequence >= G_atmi_env.sglockinc))
0625 {
0626 NDRX_LOG(log_error, "Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0627 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0628 userlog("Singleton group %d on node %ld lock lost (tl seq %ld, cur seq %ld) - exit(-1), ",
0629 G_atmi_env.procgrp_no, tpgetnodeid(), p_tl->sg_sequence, seq);
0630
0631 exit(EXFAIL);
0632 }
0633
0634
0635 p_tl->sg_sequence=seq;
0636 }
0637 out:
0638 return ret;
0639 }
0640
0641
0642
0643