0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037
0038 #include <ndrstandard.h>
0039 #include <ndebug.h>
0040 #include <atmi.h>
0041
0042 #include "atmi_shm.h"
0043 #include <atmi_int.h>
0044 #include "utlist.h"
0045
0046 #include <xa.h>
0047 #include <pgxa.h>
0048 #include <thlock.h>
0049
0050
0051 #define CONN_CLOSED 0
0052 #define CONN_OPEN 1
0053 #define TRAN_NOT_FOUND "42704"
0054
0055
0056
0057
0058
0059
0060
0061
0062 typedef struct ndrx_xid_list ndrx_xid_list_t;
0063 struct ndrx_xid_list
0064 {
0065 XID xid;
0066
0067 ndrx_xid_list_t *next, *prev;
0068 };
0069
0070
0071
0072 expublic __thread char ndrx_G_PG_conname[65]={EXEOS};
0073
0074
0075 exprivate MUTEX_LOCKDECL(M_open_lock);
0076 exprivate ndrx_pgconnect_t M_conndata;
0077 exprivate int M_conndata_ok = EXFALSE;
0078
0079
0080 exprivate __thread PGconn * M_conn = NULL;
0081 exprivate __thread int M_status = CONN_CLOSED;
0082 exprivate __thread ndrx_xid_list_t *M_list = NULL;
0083
0084
0085
0086 exprivate int xa_open_entry_stat(char *xa_info, int rmid, long flags);
0087 exprivate int xa_close_entry_stat(char *xa_info, int rmid, long flags);
0088 exprivate int xa_start_entry_stat(XID *xid, int rmid, long flags);
0089 exprivate int xa_end_entry_stat(XID *xid, int rmid, long flags);
0090 exprivate int xa_rollback_entry_stat(XID *xid, int rmid, long flags);
0091 exprivate int xa_prepare_entry_stat(XID *xid, int rmid, long flags);
0092 exprivate int xa_commit_entry_stat(XID *xid, int rmid, long flags);
0093 exprivate int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags);
0094 exprivate int xa_forget_entry_stat(XID *xid, int rmid, long flags);
0095 exprivate int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags);
0096
0097 exprivate int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0098 exprivate int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0099 exprivate int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0100 exprivate int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0101 exprivate int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0102 exprivate int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0103 exprivate int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0104 exprivate int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags);
0105 exprivate int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0106 exprivate int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags);
0107
0108 exprivate int xa_rollback_local(XID *xid, long flags);
0109
0110 expublic NDRX_API_EXPORT struct xa_switch_t ndrxpgsw =
0111 {
0112 .name = "ndrxpgsw",
0113 .flags = TMNOMIGRATE,
0114 .version = 0,
0115 .xa_open_entry = xa_open_entry_stat,
0116 .xa_close_entry = xa_close_entry_stat,
0117 .xa_start_entry = xa_start_entry_stat,
0118 .xa_end_entry = xa_end_entry_stat,
0119 .xa_rollback_entry = xa_rollback_entry_stat,
0120 .xa_prepare_entry = xa_prepare_entry_stat,
0121 .xa_commit_entry = xa_commit_entry_stat,
0122 .xa_recover_entry = xa_recover_entry_stat,
0123 .xa_forget_entry = xa_forget_entry_stat,
0124 .xa_complete_entry = xa_complete_entry_stat
0125 };
0126
0127
0128
0129
0130
0131
0132 exprivate int xid_list_add(XID *xid)
0133 {
0134 int ret = EXSUCCEED;
0135 ndrx_xid_list_t *el;
0136
0137 el = NDRX_CALLOC(1, sizeof(ndrx_xid_list_t));
0138
0139 if (NULL==el)
0140 {
0141 int err = errno;
0142 NDRX_LOG(log_error, "Failed to calloc: %d bytes: %s",
0143 sizeof(ndrx_xid_list_t), strerror(err));
0144 userlog("Failed to calloc: %d bytes: %s",
0145 sizeof(ndrx_xid_list_t), strerror(err));
0146 EXFAIL_OUT(ret);
0147 }
0148
0149 memcpy((char *)&(el->xid), (char *)xid, sizeof(XID));
0150
0151
0152 DL_APPEND(M_list, el);
0153
0154 out:
0155 return ret;
0156 }
0157
0158
0159
0160
0161
0162
0163 exprivate int xid_list_get_next(XID *xid)
0164 {
0165 ndrx_xid_list_t *tmp;
0166 if (NULL!=M_list)
0167 {
0168 memcpy((char *)xid, (char *)&M_list->xid, sizeof(XID));
0169
0170 tmp = M_list;
0171 DL_DELETE(M_list, M_list);
0172 NDRX_FREE((char *)tmp);
0173 return EXTRUE;
0174 }
0175
0176 return EXFALSE;
0177 }
0178
0179
0180
0181
0182 exprivate void xid_list_free(void)
0183 {
0184 ndrx_xid_list_t *el, *elt;
0185
0186 DL_FOREACH_SAFE(M_list,el,elt)
0187 {
0188 DL_DELETE(M_list,el);
0189 NDRX_FREE(el);
0190 }
0191
0192 }
0193
0194
0195
0196
0197
0198
0199 exprivate void *ndrx_pg_getconn(void)
0200 {
0201 return (void *)M_conn;
0202 }
0203
0204
0205
0206
0207
0208
0209
0210 struct xa_switch_t *ndrx_get_xa_switch(void)
0211 {
0212
0213
0214
0215
0216 ndrx_xa_nostartxid(EXTRUE);
0217 ndrx_xa_setloctxabort(xa_rollback_local);
0218 ndrx_xa_setgetconnn(ndrx_pg_getconn);
0219
0220 return &ndrxpgsw;
0221 }
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235 exprivate int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0236 {
0237 int ret = XA_OK;
0238 static int conn_counter = 0;
0239 static int first = EXTRUE;
0240 int connid;
0241
0242 if (CONN_OPEN==M_status)
0243 {
0244 NDRX_LOG(log_error, "Connection is already open");
0245 ret=XAER_PROTO;
0246 goto out;
0247 }
0248
0249
0250 if (first)
0251 {
0252 MUTEX_LOCK_V(M_open_lock);
0253 if (first)
0254 {
0255
0256 ndrx_xa_nojoin(EXTRUE);
0257 first=EXFALSE;
0258 }
0259 MUTEX_UNLOCK_V(M_open_lock);
0260 }
0261
0262
0263 if (!M_conndata_ok)
0264 {
0265 MUTEX_LOCK_V(M_open_lock);
0266
0267 if (!M_conndata_ok)
0268 {
0269 if (EXSUCCEED!=ndrx_pg_xa_cfgparse(xa_info, &M_conndata))
0270 {
0271 NDRX_LOG(log_error, "Failed to parse Open string!");
0272 MUTEX_UNLOCK_V(M_open_lock);
0273 ret = XAER_INVAL;
0274 goto out;
0275 }
0276
0277 M_conndata_ok = EXTRUE;
0278 MUTEX_UNLOCK_V(M_open_lock);
0279 }
0280 }
0281
0282
0283
0284
0285 if (EXEOS==ndrx_G_PG_conname[0])
0286 {
0287 long date;
0288 long time;
0289 long usec;
0290
0291
0292 MUTEX_LOCK_V(M_open_lock);
0293 connid = conn_counter;
0294
0295 conn_counter++;
0296
0297 if (conn_counter > 16000)
0298 {
0299 conn_counter = 0;
0300 }
0301
0302 MUTEX_UNLOCK_V(M_open_lock);
0303
0304 ndrx_get_dt_local(&date, &time, &usec);
0305
0306 snprintf(ndrx_G_PG_conname, sizeof(ndrx_G_PG_conname), "%ld-%ld%ld-%d",
0307 date, time, (long)(usec / 1000), connid);
0308 }
0309
0310 NDRX_LOG(log_debug, "Connection name: [%s]", ndrx_G_PG_conname);
0311
0312 M_conn = ndrx_pg_connect(&M_conndata, ndrx_G_PG_conname);
0313
0314 if (NULL==M_conn)
0315 {
0316 NDRX_LOG(log_error, "Postgres error: failed to get PQ connection!");
0317 ret = XAER_RMERR;
0318 goto out;
0319 }
0320
0321 M_status = CONN_OPEN;
0322 NDRX_LOG(log_info, "Connection [%s] is open %p", ndrx_G_PG_conname, M_conn);
0323
0324 out:
0325 return ret;
0326 }
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336 exprivate int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0337 {
0338 int ret = XA_OK;
0339
0340 if (CONN_OPEN!=M_status)
0341 {
0342 NDRX_LOG(log_debug, "XA Already closed");
0343 goto out;
0344 }
0345
0346 if (EXSUCCEED!=ndrx_pg_disconnect(M_conn, ndrx_G_PG_conname))
0347 {
0348 NDRX_LOG(log_error, "ndrx_pg_disconnect failed: %s",
0349 PQerrorMessage(M_conn));
0350 return XAER_RMERR;
0351 }
0352
0353 M_conn = NULL;
0354 M_status = CONN_CLOSED;
0355
0356 NDRX_LOG(log_info, "Connection closed");
0357
0358 out:
0359 return ret;
0360 }
0361
0362
0363
0364
0365
0366
0367
0368
0369 exprivate int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0370 {
0371 int ret = XA_OK;
0372 PGresult *res = NULL;
0373
0374 if (CONN_OPEN!=M_status)
0375 {
0376 NDRX_LOG(log_debug, "XA Not open");
0377 ret = XAER_PROTO;
0378 goto out;
0379 }
0380
0381 if (TMNOFLAGS != flags)
0382 {
0383 NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld), passed to start_entry", flags);
0384 ret = XAER_INVAL;
0385 goto out;
0386 }
0387
0388
0389 res = PQexec(M_conn, "BEGIN");
0390 if (PGRES_COMMAND_OK != PQresultStatus(res))
0391 {
0392 NDRX_LOG(log_error, "Failed to begin transaction: %s", PQerrorMessage(M_conn));
0393 ret = XAER_RMERR;
0394 goto out;
0395 }
0396
0397 out:
0398
0399 PQclear(res);
0400
0401 return ret;
0402 }
0403
0404
0405
0406
0407
0408
0409
0410
0411
0412 exprivate int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0413 {
0414 int ret = XA_OK;
0415 long accepted_flags = TMSUCCESS|TMFAIL;
0416
0417 if (CONN_OPEN!=M_status)
0418 {
0419 NDRX_LOG(log_debug, "XA Not open");
0420 ret = XAER_PROTO;
0421 goto out;
0422 }
0423
0424
0425 if ( (flags | accepted_flags) != accepted_flags)
0426 {
0427 NDRX_LOG(log_error, "Accepted flags are: TMSUCCESS|TMFAIL, but got %ld",
0428 flags);
0429 ret = XAER_INVAL;
0430 goto out;
0431 }
0432
0433
0434 NDRX_LOG(log_debug, "END OK");
0435 out:
0436
0437 return ret;
0438 }
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450
0451 exprivate int xa_tran_entry(struct xa_switch_t *sw, char *sql_cmd, char *dbg_msg,
0452 XID *xid, int rmid, long flags, int is_prep)
0453 {
0454 int ret = XA_OK;
0455 char stmt[1024];
0456 char pgxid[NDRX_PG_STMTBUFSZ];
0457 PGresult *res = NULL;
0458
0459 if (CONN_OPEN!=M_status)
0460 {
0461 NDRX_LOG(log_debug, "XA Not open");
0462 ret = XAER_PROTO;
0463 goto out;
0464 }
0465
0466 if (TMNOFLAGS != flags)
0467 {
0468 NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld), passed to %s",
0469 flags, dbg_msg);
0470 ret = XAER_INVAL;
0471 goto out;
0472 }
0473
0474 if (EXSUCCEED!=ndrx_pg_xid_to_db(xid, pgxid, sizeof(pgxid)))
0475 {
0476 NDRX_DUMP(log_error, "Failed to convert XID to pg string", xid, sizeof(*xid));
0477 ret = XAER_INVAL;
0478 goto out;
0479 }
0480
0481 snprintf(stmt, sizeof(stmt), "%s '%s';", sql_cmd, pgxid);
0482
0483 NDRX_LOG(log_info, "Exec: [%s]", stmt);
0484
0485 res = PQexec(M_conn, stmt);
0486 if (PGRES_COMMAND_OK != PQresultStatus(res))
0487 {
0488 char *state = PQresultErrorField(res, PG_DIAG_SQLSTATE);
0489
0490 if (0==strcmp(TRAN_NOT_FOUND, state))
0491 {
0492 NDRX_LOG(log_info, "Transaction not found (probably read-only branch)");
0493 }
0494 else
0495 {
0496 NDRX_LOG(log_error, "SQL STATE %s: Failed to %s transaction by [%s]: %s",
0497 state, dbg_msg, stmt, PQerrorMessage(M_conn));
0498
0499 if (is_prep)
0500 {
0501 NDRX_LOG(log_error, "Work is rolled back automatically by PG");
0502 ret = XA_RBROLLBACK;
0503 }
0504 }
0505 }
0506
0507 NDRX_LOG(log_debug, "%s OK", dbg_msg);
0508 out:
0509
0510 PQclear(res);
0511
0512 return ret;
0513 }
0514
0515
0516
0517
0518
0519
0520
0521
0522
0523
0524 exprivate int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0525 {
0526 return xa_tran_entry(sw, "ROLLBACK PREPARED", "ROLLBACK",
0527 xid, rmid, flags, EXFALSE);
0528 }
0529
0530
0531
0532
0533
0534
0535
0536
0537
0538
0539 exprivate int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0540 {
0541 return xa_tran_entry(sw, "PREPARE TRANSACTION", "PREPARE",
0542 xid, rmid, flags, EXTRUE);
0543 }
0544
0545
0546
0547
0548
0549
0550
0551
0552
0553
0554 exprivate int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0555 {
0556 return xa_tran_entry(sw, "COMMIT PREPARED", "COMMIT",
0557 xid, rmid, flags, EXFALSE);
0558 }
0559
0560
0561
0562
0563
0564
0565
0566
0567
0568
0569
0570
0571
0572 exprivate int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count,
0573 int rmid, long flags)
0574 {
0575 int ret = XA_OK;
0576 long accepted_flags = TMSTARTRSCAN|TMENDRSCAN|TMNOFLAGS;
0577 PGresult *res = NULL;
0578 int i;
0579 int nrtx;
0580
0581
0582 if ( (flags | accepted_flags) != accepted_flags)
0583 {
0584 NDRX_LOG(log_error, "Accepted flags are: TMSTARTRSCAN|TMENDRSCAN|TMNOFLAGS, but got %ld",
0585 flags);
0586 ret = XAER_INVAL;
0587 goto out;
0588 }
0589
0590 if (CONN_OPEN!=M_status)
0591 {
0592 NDRX_LOG(log_debug, "XA Not open");
0593 ret = XAER_PROTO;
0594 goto out;
0595 }
0596
0597
0598 if (flags & TMSTARTRSCAN)
0599 {
0600
0601 xid_list_free();
0602
0603
0604 res = PQexec(M_conn, "BEGIN");
0605 if (PQresultStatus(res) != PGRES_COMMAND_OK)
0606 {
0607 NDRX_LOG(log_error, "BEGIN command failed: %s",
0608 PQerrorMessage(M_conn));
0609 PQclear(res);
0610 ret = XAER_RMERR;
0611 goto out;
0612 }
0613
0614 PQclear(res);
0615
0616
0617
0618
0619
0620
0621
0622
0623
0624
0625 res = PQexec(M_conn, "DECLARE ndrx_pq_list_xids "
0626 "CURSOR FOR SELECT gid FROM pg_prepared_xacts ORDER BY prepared;");
0627
0628 if (PQresultStatus(res) != PGRES_COMMAND_OK)
0629 {
0630 NDRX_LOG(log_error, "DECLARE CURSOR failed: %s", PQerrorMessage(M_conn));
0631 PQclear(res);
0632 ret = XAER_RMERR;
0633 goto out;
0634 }
0635
0636 PQclear(res);
0637
0638 res = PQexec(M_conn, "FETCH ALL in ndrx_pq_list_xids;");
0639 if (PQresultStatus(res) != PGRES_TUPLES_OK)
0640 {
0641 NDRX_LOG(log_error, "FETCH ALL failed: %s", PQerrorMessage(M_conn));
0642 PQclear(res);
0643 ret = XAER_RMERR;
0644 goto out;
0645 }
0646
0647
0648 nrtx = PQntuples(res);
0649
0650 NDRX_LOG(log_info, "Recovered %d transactions", nrtx);
0651 for (i = 0; i < nrtx; i++)
0652 {
0653 char *btid = PQgetvalue(res, i, 0);
0654 XID xid_fetch;
0655
0656 NDRX_LOG(log_debug, "Got BTID: [%s] - try parse", btid);
0657 if (EXSUCCEED!=ndrx_pg_db_to_xid(btid, &xid_fetch))
0658 {
0659 continue;
0660 }
0661
0662
0663 if (EXSUCCEED!=xid_list_add(&xid_fetch))
0664 {
0665 NDRX_LOG(log_error, "Failed to add BTID to list!");
0666 PQclear(res);
0667 EXFAIL_OUT(ret);
0668 }
0669 }
0670 PQclear(res);
0671
0672
0673 res = PQexec(M_conn, "CLOSE ndrx_pq_list_xids;");
0674 PQclear(res);
0675
0676
0677 res = PQexec(M_conn, "END;");
0678 PQclear(res);
0679
0680 }
0681
0682
0683 nrtx = 0;
0684
0685 for (i=0; i<count; i++)
0686 {
0687 if (EXTRUE==xid_list_get_next(&xid[i]))
0688 {
0689 nrtx++;
0690 }
0691 else
0692 {
0693 break;
0694 }
0695 }
0696
0697 ret = nrtx;
0698
0699 if (TMENDRSCAN & flags)
0700 {
0701 xid_list_free();
0702 }
0703
0704 out:
0705
0706 NDRX_LOG(log_info, "Returning %d", ret);
0707 return ret;
0708 }
0709
0710
0711
0712
0713
0714
0715
0716
0717
0718 exprivate int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0719 {
0720 return xa_tran_entry(sw, "ROLLBACK PREPARED", "FORGET",
0721 xid, rmid, flags, EXFALSE);
0722 }
0723
0724
0725
0726
0727
0728
0729
0730
0731
0732
0733 exprivate int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags)
0734 {
0735 return EXFAIL;
0736 }
0737
0738
0739
0740
0741
0742
0743
0744
0745
0746 exprivate int xa_rollback_local(XID *xid, long flags)
0747 {
0748 int ret = XA_OK;
0749 char stmt[1024];
0750 char pgxid[NDRX_PG_STMTBUFSZ];
0751 PGresult *res = NULL;
0752
0753 if (CONN_OPEN!=M_status)
0754 {
0755 NDRX_LOG(log_debug, "XA Not open");
0756 ret = XAER_PROTO;
0757 goto out;
0758 }
0759
0760 if (TMNOFLAGS != flags)
0761 {
0762 NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld)",
0763 flags);
0764 ret = XAER_INVAL;
0765 goto out;
0766 }
0767
0768 NDRX_STRCPY_SAFE(stmt, "ROLLBACK");
0769
0770 NDRX_LOG(log_info, "Exec: [%s]", stmt);
0771
0772 res = PQexec(M_conn, stmt);
0773
0774 if (PGRES_COMMAND_OK != PQresultStatus(res))
0775 {
0776 char *state = PQresultErrorField(res, PG_DIAG_SQLSTATE);
0777
0778 if (0==strcmp(TRAN_NOT_FOUND, state))
0779 {
0780 NDRX_LOG(log_info, "Transaction not found");
0781 }
0782 else
0783 {
0784 ret = XAER_RMERR;
0785 }
0786 }
0787
0788 NDRX_LOG(log_debug, "%s OK", stmt);
0789
0790 out:
0791
0792 PQclear(res);
0793
0794 return ret;
0795 }
0796
0797
0798 exprivate int xa_open_entry_stat( char *xa_info, int rmid, long flags)
0799 {
0800 return xa_open_entry(&ndrxpgsw, xa_info, rmid, flags);
0801 }
0802 exprivate int xa_close_entry_stat(char *xa_info, int rmid, long flags)
0803 {
0804 return xa_close_entry(&ndrxpgsw, xa_info, rmid, flags);
0805 }
0806 exprivate int xa_start_entry_stat(XID *xid, int rmid, long flags)
0807 {
0808 return xa_start_entry(&ndrxpgsw, xid, rmid, flags);
0809 }
0810
0811 exprivate int xa_end_entry_stat(XID *xid, int rmid, long flags)
0812 {
0813 return xa_end_entry(&ndrxpgsw, xid, rmid, flags);
0814 }
0815 exprivate int xa_rollback_entry_stat(XID *xid, int rmid, long flags)
0816 {
0817 return xa_rollback_entry(&ndrxpgsw, xid, rmid, flags);
0818 }
0819 exprivate int xa_prepare_entry_stat(XID *xid, int rmid, long flags)
0820 {
0821 return xa_prepare_entry(&ndrxpgsw, xid, rmid, flags);
0822 }
0823
0824 exprivate int xa_commit_entry_stat(XID *xid, int rmid, long flags)
0825 {
0826 return xa_commit_entry(&ndrxpgsw, xid, rmid, flags);
0827 }
0828
0829 exprivate int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags)
0830 {
0831 return xa_recover_entry(&ndrxpgsw, xid, count, rmid, flags);
0832 }
0833 exprivate int xa_forget_entry_stat(XID *xid, int rmid, long flags)
0834 {
0835 return xa_forget_entry(&ndrxpgsw, xid, rmid, flags);
0836 }
0837 exprivate int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags)
0838 {
0839 return xa_complete_entry(&ndrxpgsw, handle, retval, rmid, flags);
0840 }
0841
0842
0843