Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Postgres C XA Switch emulation
0003  *
0004  * @file pgswitch.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 
0038 #include <ndrstandard.h>
0039 #include <ndebug.h>
0040 #include <atmi.h>
0041 
0042 #include "atmi_shm.h"
0043 #include <atmi_int.h>
0044 #include "utlist.h"
0045 
0046 #include <xa.h>
0047 #include <pgxa.h>
0048 #include <thlock.h>
0049 /*---------------------------Externs------------------------------------*/
0050 /*---------------------------Macros-------------------------------------*/
0051 #define CONN_CLOSED 0   /**< The interface is not open  */
0052 #define CONN_OPEN   1   /**< The interface is open      */
0053 #define TRAN_NOT_FOUND  "42704" /**< SQL State for transaction not found */
0054 /*---------------------------Enums--------------------------------------*/
0055 /*---------------------------Typedefs-----------------------------------*/
0056 
0057 /* we need a list of XIDs for keeping the recovery BTID copies: */
0058 
0059 /**
0060  * List of xid left for fetch
0061  */
0062 typedef struct ndrx_xid_list ndrx_xid_list_t;
0063 struct ndrx_xid_list
0064 {
0065     XID xid;
0066 
0067     ndrx_xid_list_t *next, *prev;
0068 };
0069 
0070 
0071 /*---------------------------Globals------------------------------------*/
0072 expublic __thread char ndrx_G_PG_conname[65]={EXEOS}; /**< connection name    */
0073 /*---------------------------Statics------------------------------------*/
0074 
0075 exprivate MUTEX_LOCKDECL(M_open_lock);
0076 exprivate ndrx_pgconnect_t M_conndata; /**< parsed connection data            */
0077 exprivate int M_conndata_ok = EXFALSE; /**< Is connection parsed ok & cached? */
0078 
0079 /* threaded data: */
0080 exprivate __thread PGconn * M_conn = NULL;   /**< Actual connection           */ 
0081 exprivate __thread int M_status = CONN_CLOSED;  /**< thread based status      */
0082 exprivate __thread ndrx_xid_list_t *M_list = NULL; /**< XID list of cur recov */
0083 
0084 /*---------------------------Prototypes---------------------------------*/
0085 
0086 exprivate int xa_open_entry_stat(char *xa_info, int rmid, long flags);
0087 exprivate int xa_close_entry_stat(char *xa_info, int rmid, long flags);
0088 exprivate int xa_start_entry_stat(XID *xid, int rmid, long flags);
0089 exprivate int xa_end_entry_stat(XID *xid, int rmid, long flags);
0090 exprivate int xa_rollback_entry_stat(XID *xid, int rmid, long flags);
0091 exprivate int xa_prepare_entry_stat(XID *xid, int rmid, long flags);
0092 exprivate int xa_commit_entry_stat(XID *xid, int rmid, long flags);
0093 exprivate int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags);
0094 exprivate int xa_forget_entry_stat(XID *xid, int rmid, long flags);
0095 exprivate int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags);
0096 
0097 exprivate int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0098 exprivate int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags);
0099 exprivate int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0100 exprivate int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0101 exprivate int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0102 exprivate int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0103 exprivate int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0104 exprivate int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, int rmid, long flags);
0105 exprivate int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags);
0106 exprivate int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags);
0107 
0108 exprivate int xa_rollback_local(XID *xid, long flags);
0109 
0110 expublic NDRX_API_EXPORT struct xa_switch_t ndrxpgsw = 
0111 { 
0112     .name = "ndrxpgsw",
0113     .flags = TMNOMIGRATE,
0114     .version = 0,
0115     .xa_open_entry = xa_open_entry_stat,
0116     .xa_close_entry = xa_close_entry_stat,
0117     .xa_start_entry = xa_start_entry_stat,
0118     .xa_end_entry = xa_end_entry_stat,
0119     .xa_rollback_entry = xa_rollback_entry_stat,
0120     .xa_prepare_entry = xa_prepare_entry_stat,
0121     .xa_commit_entry = xa_commit_entry_stat,
0122     .xa_recover_entry = xa_recover_entry_stat,
0123     .xa_forget_entry = xa_forget_entry_stat,
0124     .xa_complete_entry = xa_complete_entry_stat
0125 };
0126 
0127 /**
0128  * Add entry to list
0129  * @param xid to put on list
0130  * @return EXSUCCEED/EXFAIL(OOM)
0131  */
0132 exprivate int xid_list_add(XID *xid)
0133 {
0134     int ret = EXSUCCEED;
0135     ndrx_xid_list_t *el;
0136     
0137     el = NDRX_CALLOC(1, sizeof(ndrx_xid_list_t));
0138     
0139     if (NULL==el)
0140     {
0141         int err = errno;
0142         NDRX_LOG(log_error, "Failed to calloc: %d bytes: %s", 
0143                 sizeof(ndrx_xid_list_t), strerror(err));
0144         userlog("Failed to calloc: %d bytes: %s", 
0145                 sizeof(ndrx_xid_list_t), strerror(err));
0146         EXFAIL_OUT(ret);
0147     }
0148     
0149     memcpy((char *)&(el->xid), (char *)xid, sizeof(XID));
0150     
0151     
0152     DL_APPEND(M_list, el);
0153     
0154 out:
0155     return ret;
0156 }
0157 
0158 /**
0159  * Fetch the next xid
0160  * @param xid
0161  * @return EXTRUE - have next / EXFALSE - we have EOF
0162  */
0163 exprivate int xid_list_get_next(XID *xid)
0164 {
0165     ndrx_xid_list_t *tmp;
0166     if (NULL!=M_list)
0167     {
0168         memcpy((char *)xid, (char *)&M_list->xid, sizeof(XID));
0169         
0170         tmp = M_list;
0171         DL_DELETE(M_list, M_list);
0172         NDRX_FREE((char *)tmp);
0173         return EXTRUE;
0174     }
0175     
0176     return EXFALSE;
0177 }
0178 
0179 /**
0180  * Remove all from XID list
0181  */
0182 exprivate void xid_list_free(void)
0183 {
0184     ndrx_xid_list_t *el, *elt;
0185     
0186     DL_FOREACH_SAFE(M_list,el,elt)
0187     {
0188         DL_DELETE(M_list,el);
0189         NDRX_FREE(el);
0190     }
0191     
0192 }
0193 
0194 /**
0195  * Get connection handler callback
0196  * i.e. backend of tpconnect() for C PG/ECPG driver
0197  * @return ptr to connection (for current thread)
0198  */
0199 exprivate void *ndrx_pg_getconn(void)
0200 {
0201     return (void *)M_conn;
0202 }
0203 
0204 /**
0205  * API entry of loading the driver
0206  * @param symbol
0207  * @param descr
0208  * @return XA switch or null
0209  */
0210 struct xa_switch_t *ndrx_get_xa_switch(void)
0211 {
0212     /* configure other flags */
0213     
0214     /* prase the config and share it between threads */
0215     
0216     ndrx_xa_nostartxid(EXTRUE);
0217     ndrx_xa_setloctxabort(xa_rollback_local);
0218     ndrx_xa_setgetconnn(ndrx_pg_getconn);
0219     
0220     return &ndrxpgsw;
0221 }
0222 
0223 /**
0224  * Open API.
0225  * This is called per thread.
0226  * The config is written in JSON.
0227  * Syntax:
0228  * { "url":"unix:postgresql://sql.mydomain.com:5432/mydb", "user":"test", "password":"test1", "compat":"INFORMIX|INFORMIX_SE|PGSQL"}
0229  * @param switch 
0230  * @param xa_info
0231  * @param rmid
0232  * @param flags
0233  * @return 
0234  */
0235 exprivate int xa_open_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0236 {
0237     int ret = XA_OK;
0238     static int conn_counter = 0;
0239     static int first = EXTRUE;
0240     int connid;
0241     
0242     if (CONN_OPEN==M_status)
0243     {
0244         NDRX_LOG(log_error, "Connection is already open");
0245         ret=XAER_PROTO;
0246         goto out;
0247     }
0248     
0249     /* mark that join is not supported */
0250     if (first)
0251     {
0252         MUTEX_LOCK_V(M_open_lock);
0253         if (first)
0254         {
0255             /* no join pls... */
0256             ndrx_xa_nojoin(EXTRUE);
0257             first=EXFALSE;
0258         }
0259         MUTEX_UNLOCK_V(M_open_lock);
0260     }
0261 
0262     /* try parse config */
0263     if (!M_conndata_ok)
0264     {
0265         MUTEX_LOCK_V(M_open_lock);
0266         
0267         if (!M_conndata_ok)
0268         {
0269             if (EXSUCCEED!=ndrx_pg_xa_cfgparse(xa_info, &M_conndata))
0270             {
0271                 NDRX_LOG(log_error, "Failed to parse Open string!");
0272                 MUTEX_UNLOCK_V(M_open_lock);
0273                 ret = XAER_INVAL;
0274                 goto out;
0275             }
0276             
0277             M_conndata_ok = EXTRUE;
0278             MUTEX_UNLOCK_V(M_open_lock);
0279         }
0280     }
0281     
0282     /* generate connection name. The format is following:
0283      * YYYYMMDD-HHMISSFFF-%d 
0284      */
0285     if (EXEOS==ndrx_G_PG_conname[0])
0286     {
0287         long date;
0288         long time;
0289         long usec;
0290         
0291         /* use the same lock for connection naming.. */
0292         MUTEX_LOCK_V(M_open_lock);
0293         connid = conn_counter;
0294         
0295         conn_counter++;
0296         
0297         if (conn_counter > 16000)
0298         {
0299             conn_counter = 0;
0300         }
0301         
0302         MUTEX_UNLOCK_V(M_open_lock);
0303         
0304         ndrx_get_dt_local(&date, &time, &usec);
0305         
0306         snprintf(ndrx_G_PG_conname, sizeof(ndrx_G_PG_conname), "%ld-%ld%ld-%d",
0307                 date, time, (long)(usec / 1000), connid);
0308     }
0309     
0310     NDRX_LOG(log_debug, "Connection name: [%s]", ndrx_G_PG_conname);
0311     
0312     M_conn = ndrx_pg_connect(&M_conndata, ndrx_G_PG_conname);
0313     
0314     if (NULL==M_conn)
0315     {
0316         NDRX_LOG(log_error, "Postgres error: failed to get PQ connection!");
0317         ret = XAER_RMERR;
0318         goto out;
0319     }
0320     
0321     M_status = CONN_OPEN;
0322     NDRX_LOG(log_info, "Connection [%s] is open %p", ndrx_G_PG_conname, M_conn);
0323     
0324 out:
0325     return ret;
0326 }
0327 
0328 /**
0329  * Close entry.
0330  * @param sw xa switch
0331  * @param xa_info close string
0332  * @param rmid RM id
0333  * @param flags flags
0334  * @return xa err
0335  */
0336 exprivate int xa_close_entry(struct xa_switch_t *sw, char *xa_info, int rmid, long flags)
0337 {
0338     int ret = XA_OK;
0339     
0340     if (CONN_OPEN!=M_status)
0341     {
0342         NDRX_LOG(log_debug, "XA Already closed");
0343         goto out;
0344     }
0345 
0346     if (EXSUCCEED!=ndrx_pg_disconnect(M_conn, ndrx_G_PG_conname))
0347     {
0348         NDRX_LOG(log_error, "ndrx_pg_disconnect failed: %s", 
0349                 PQerrorMessage(M_conn));
0350         return XAER_RMERR;
0351     }
0352     
0353     M_conn = NULL;
0354     M_status = CONN_CLOSED;
0355     
0356     NDRX_LOG(log_info, "Connection closed");
0357     
0358 out:
0359     return ret;
0360 }
0361 
0362 /**
0363  * Just start the transaction.
0364  * @param xa_info
0365  * @param rmid
0366  * @param flags
0367  * @return 
0368  */
0369 exprivate int xa_start_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0370 {
0371     int ret = XA_OK;
0372     PGresult *res = NULL;
0373     
0374     if (CONN_OPEN!=M_status)
0375     {
0376         NDRX_LOG(log_debug, "XA Not open");
0377         ret = XAER_PROTO;
0378         goto out;
0379     }
0380     
0381     if (TMNOFLAGS != flags)
0382     {
0383         NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld), passed to start_entry", flags);
0384         ret = XAER_INVAL;
0385         goto out;
0386     }
0387 
0388     /* start PG transaction */
0389     res = PQexec(M_conn, "BEGIN");
0390     if (PGRES_COMMAND_OK != PQresultStatus(res))
0391     {
0392         NDRX_LOG(log_error, "Failed to begin transaction: %s", PQerrorMessage(M_conn));
0393         ret = XAER_RMERR;
0394         goto out;
0395     }
0396     
0397 out:
0398     
0399     PQclear(res);
0400 
0401     return ret;
0402 }
0403 
0404 /**
0405  * Terminate transaction in progress
0406  * @param sw
0407  * @param xid
0408  * @param rmid
0409  * @param flags
0410  * @return 
0411  */
0412 exprivate int xa_end_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0413 {
0414     int ret = XA_OK;
0415     long accepted_flags = TMSUCCESS|TMFAIL;
0416     
0417     if (CONN_OPEN!=M_status)
0418     {
0419         NDRX_LOG(log_debug, "XA Not open");
0420         ret = XAER_PROTO;
0421         goto out;
0422     }
0423     
0424     /* check the flags */
0425     if ( (flags | accepted_flags) != accepted_flags)
0426     {
0427         NDRX_LOG(log_error, "Accepted flags are: TMSUCCESS|TMFAIL, but got %ld",
0428                 flags);
0429         ret = XAER_INVAL;
0430         goto out;
0431     }
0432 
0433     
0434     NDRX_LOG(log_debug, "END OK");
0435 out:
0436     
0437     return ret;
0438 }
0439 
0440 /**
0441  * Common transaction management routine
0442  * @param sw XA Switch
0443  * @param sql_cmd SQL Command to execute
0444  * @param dbg_msg debug message related with command
0445  * @param xid transaction XID
0446  * @param rmid resource manager ID
0447  * @param flags currently not flags are supported
0448  * @param[in] is_prep is this prepare statement call
0449  * @return XA error code or XA_OK
0450  */
0451 exprivate int xa_tran_entry(struct xa_switch_t *sw, char *sql_cmd, char *dbg_msg,
0452         XID *xid, int rmid, long flags, int is_prep)
0453 {
0454     int ret = XA_OK;
0455     char stmt[1024];
0456     char pgxid[NDRX_PG_STMTBUFSZ];
0457     PGresult *res = NULL;
0458     
0459     if (CONN_OPEN!=M_status)
0460     {
0461         NDRX_LOG(log_debug, "XA Not open");
0462         ret = XAER_PROTO;
0463         goto out;
0464     }
0465     
0466     if (TMNOFLAGS != flags)
0467     {
0468         NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld), passed to %s", 
0469                 flags, dbg_msg);
0470         ret = XAER_INVAL;
0471         goto out;
0472     }
0473     
0474     if (EXSUCCEED!=ndrx_pg_xid_to_db(xid, pgxid, sizeof(pgxid)))
0475     {
0476         NDRX_DUMP(log_error, "Failed to convert XID to pg string", xid, sizeof(*xid));
0477         ret = XAER_INVAL;
0478         goto out;
0479     }
0480     
0481     snprintf(stmt, sizeof(stmt), "%s '%s';", sql_cmd, pgxid);
0482     
0483     NDRX_LOG(log_info, "Exec: [%s]", stmt);
0484     
0485     res = PQexec(M_conn, stmt);
0486     if (PGRES_COMMAND_OK != PQresultStatus(res)) 
0487     {
0488         char *state = PQresultErrorField(res, PG_DIAG_SQLSTATE);
0489         
0490         if (0==strcmp(TRAN_NOT_FOUND, state))
0491         {
0492             NDRX_LOG(log_info, "Transaction not found (probably read-only branch)");
0493         }
0494         else
0495         {
0496             NDRX_LOG(log_error, "SQL STATE %s: Failed to %s transaction by [%s]: %s",
0497                     state, dbg_msg, stmt, PQerrorMessage(M_conn));
0498 
0499             if (is_prep)
0500             {
0501                 NDRX_LOG(log_error, "Work is rolled back automatically by PG");
0502                 ret = XA_RBROLLBACK;
0503             }
0504         }
0505     }
0506     
0507     NDRX_LOG(log_debug, "%s OK", dbg_msg);
0508 out:
0509     
0510     PQclear(res);
0511 
0512     return ret;
0513 }
0514 
0515 /**
0516  * We rollback only prepared transaction.
0517  * Thus every transaction will get prepare call
0518  * @param sw
0519  * @param xid
0520  * @param rmid
0521  * @param flags
0522  * @return 
0523  */
0524 exprivate int xa_rollback_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0525 {
0526     return xa_tran_entry(sw, "ROLLBACK PREPARED", "ROLLBACK", 
0527             xid, rmid, flags, EXFALSE);
0528 }
0529 
0530 /**
0531  * Prepare transaction. Any error from statement exec makes the current
0532  * job to be rolled back.
0533  * @param sw
0534  * @param xid
0535  * @param rmid
0536  * @param flags
0537  * @return 
0538  */
0539 exprivate int xa_prepare_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0540 {
0541     return xa_tran_entry(sw, "PREPARE TRANSACTION", "PREPARE", 
0542             xid, rmid, flags, EXTRUE);
0543 }
0544 
0545 /**
0546  * If XID is not found then we shall assume that transaction prepare failed
0547  * for some reason, and we shall return XA_HEURRB.
0548  * @param sw
0549  * @param xid
0550  * @param rmid
0551  * @param flags
0552  * @return 
0553  */
0554 exprivate int xa_commit_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0555 {
0556     return xa_tran_entry(sw, "COMMIT PREPARED", "COMMIT", 
0557             xid, rmid, flags, EXFALSE);
0558 }
0559 
0560 /**
0561  * Return list of trans.
0562  * Java returns full list, but our buffer is limited.
0563  * Thus we load the `count' number of items, but we will return the actual
0564  * number of java items
0565  * @param sw xa switch
0566  * @param xid buffer where to load xids
0567  * @param count number of xid buffer size
0568  * @param rmid resourcemanager id
0569  * @param flags flags
0570  * @return XA_OK, XERR
0571  */
0572 exprivate int xa_recover_entry(struct xa_switch_t *sw, XID *xid, long count, 
0573         int rmid, long flags)
0574 {
0575     int ret = XA_OK;
0576     long accepted_flags = TMSTARTRSCAN|TMENDRSCAN|TMNOFLAGS;
0577     PGresult   *res = NULL;
0578     int i;
0579     int nrtx;
0580     
0581     /* check the flags */
0582     if ( (flags | accepted_flags) != accepted_flags)
0583     {
0584         NDRX_LOG(log_error, "Accepted flags are: TMSTARTRSCAN|TMENDRSCAN|TMNOFLAGS, but got %ld",
0585                 flags);
0586         ret = XAER_INVAL;
0587         goto out;
0588     }
0589     
0590     if (CONN_OPEN!=M_status)
0591     {
0592         NDRX_LOG(log_debug, "XA Not open");
0593         ret = XAER_PROTO;
0594         goto out;
0595     }
0596     
0597     
0598     if (flags & TMSTARTRSCAN)
0599     {
0600         /* remove any old scans... */
0601         xid_list_free();
0602         
0603         /* Start a transaction block */
0604         res = PQexec(M_conn, "BEGIN");
0605         if (PQresultStatus(res) != PGRES_COMMAND_OK)
0606         {
0607             NDRX_LOG(log_error, "BEGIN command failed: %s", 
0608                     PQerrorMessage(M_conn));
0609             PQclear(res);
0610             ret = XAER_RMERR;
0611             goto out;
0612         }
0613         
0614         PQclear(res);
0615         
0616         
0617         /* as local transaction processor at tmsrv will scan the XIDs
0618          * and perform commit/abort/(forget), that cannot be done in transaction
0619          * thus we need to scan and fill local linked list of XIDs fetched
0620          */
0621 
0622         /*
0623          * Fetch rows from pg_database, the system catalog of databases
0624          */
0625         res = PQexec(M_conn, "DECLARE ndrx_pq_list_xids "
0626                  "CURSOR  FOR SELECT gid FROM pg_prepared_xacts ORDER BY prepared;");
0627 
0628         if (PQresultStatus(res) != PGRES_COMMAND_OK)
0629         {
0630             NDRX_LOG(log_error, "DECLARE CURSOR failed: %s", PQerrorMessage(M_conn));
0631             PQclear(res);
0632             ret = XAER_RMERR;
0633             goto out;
0634         }
0635 
0636         PQclear(res);
0637 
0638         res = PQexec(M_conn, "FETCH ALL in ndrx_pq_list_xids;");
0639         if (PQresultStatus(res) != PGRES_TUPLES_OK)
0640         {
0641             NDRX_LOG(log_error, "FETCH ALL failed: %s", PQerrorMessage(M_conn));
0642             PQclear(res);
0643             ret = XAER_RMERR;
0644             goto out;
0645         }
0646 
0647         /* Read xids into linked list? */
0648         nrtx = PQntuples(res);
0649 
0650         NDRX_LOG(log_info, "Recovered %d transactions", nrtx);
0651         for (i = 0; i < nrtx; i++)
0652         {
0653             char *btid = PQgetvalue(res, i, 0);
0654             XID xid_fetch;
0655             
0656             NDRX_LOG(log_debug, "Got BTID: [%s] - try parse", btid);
0657             if (EXSUCCEED!=ndrx_pg_db_to_xid(btid, &xid_fetch))
0658             {
0659                 continue;
0660             }
0661             
0662             /* Add to DL */
0663             if (EXSUCCEED!=xid_list_add(&xid_fetch))
0664             {
0665                 NDRX_LOG(log_error, "Failed to add BTID to list!");
0666                 PQclear(res);
0667                 EXFAIL_OUT(ret);
0668             }
0669         }
0670         PQclear(res);
0671         
0672         /* close the scan */
0673         res = PQexec(M_conn, "CLOSE ndrx_pq_list_xids;");
0674         PQclear(res);
0675 
0676         /* end the transaction */
0677         res = PQexec(M_conn, "END;");
0678         PQclear(res);
0679         
0680     } /* TMSTARTRSCAN */
0681     
0682     /* load transactions into list (as much as we have...) */
0683     nrtx = 0;
0684     
0685     for (i=0; i<count; i++)
0686     {
0687         if (EXTRUE==xid_list_get_next(&xid[i]))
0688         {
0689             nrtx++;
0690         }
0691         else
0692         {
0693             break;
0694         }
0695     }
0696     
0697     ret = nrtx;
0698     
0699     if (TMENDRSCAN & flags)
0700     {
0701         xid_list_free();
0702     }
0703     
0704 out:    
0705     
0706     NDRX_LOG(log_info, "Returning %d", ret);
0707     return ret;
0708 }
0709 
0710 /**
0711  * Forget transaction
0712  * @param sw xa switch
0713  * @param xid XID
0714  * @param rmid RM ID
0715  * @param flags flags
0716  * @return XA_OK, XERR
0717  */
0718 exprivate int xa_forget_entry(struct xa_switch_t *sw, XID *xid, int rmid, long flags)
0719 {
0720     return xa_tran_entry(sw, "ROLLBACK PREPARED", "FORGET", 
0721             xid, rmid, flags, EXFALSE);
0722 }
0723 
0724 /**
0725  * CURRENTLY NOT USED!!!
0726  * @param sw
0727  * @param handle
0728  * @param retval
0729  * @param rmid
0730  * @param flags
0731  * @return 
0732  */
0733 exprivate int xa_complete_entry(struct xa_switch_t *sw, int *handle, int *retval, int rmid, long flags)
0734 {
0735     return EXFAIL;
0736 }
0737 
0738 /**
0739  * Abort local transaction, with out XID
0740  * Use for xa_end so that we do not enter in prepared state if we know that
0741  * we are doing abort
0742  * @param xid transaction xid, not used
0743  * @param flags flags, not used
0744  * @return XA_OK, XE_ERR
0745  */
0746 exprivate int xa_rollback_local(XID *xid, long flags)
0747 {
0748     int ret = XA_OK;
0749     char stmt[1024];
0750     char pgxid[NDRX_PG_STMTBUFSZ];
0751     PGresult *res = NULL;
0752     
0753     if (CONN_OPEN!=M_status)
0754     {
0755         NDRX_LOG(log_debug, "XA Not open");
0756         ret = XAER_PROTO;
0757         goto out;
0758     }
0759     
0760     if (TMNOFLAGS != flags)
0761     {
0762         NDRX_LOG(log_error, "Flags not TMNOFLAGS (%ld)", 
0763                 flags);
0764         ret = XAER_INVAL;
0765         goto out;
0766     }
0767     
0768     NDRX_STRCPY_SAFE(stmt, "ROLLBACK");
0769     
0770     NDRX_LOG(log_info, "Exec: [%s]", stmt);
0771     
0772     res = PQexec(M_conn, stmt);
0773     
0774     if (PGRES_COMMAND_OK != PQresultStatus(res)) 
0775     {
0776         char *state = PQresultErrorField(res, PG_DIAG_SQLSTATE);
0777         
0778         if (0==strcmp(TRAN_NOT_FOUND, state))
0779         {
0780             NDRX_LOG(log_info, "Transaction not found");
0781         }
0782         else
0783         {
0784             ret = XAER_RMERR;
0785         }
0786     }
0787     
0788     NDRX_LOG(log_debug, "%s OK", stmt);
0789     
0790 out:
0791     
0792     PQclear(res);
0793 
0794     return ret;
0795 }
0796 
0797 /* Static entries */
0798 exprivate int xa_open_entry_stat( char *xa_info, int rmid, long flags)
0799 {
0800     return xa_open_entry(&ndrxpgsw, xa_info, rmid, flags);
0801 }
0802 exprivate int xa_close_entry_stat(char *xa_info, int rmid, long flags)
0803 {
0804     return xa_close_entry(&ndrxpgsw, xa_info, rmid, flags);
0805 }
0806 exprivate int xa_start_entry_stat(XID *xid, int rmid, long flags)
0807 {
0808     return xa_start_entry(&ndrxpgsw, xid, rmid, flags);
0809 }
0810 
0811 exprivate int xa_end_entry_stat(XID *xid, int rmid, long flags)
0812 {
0813     return xa_end_entry(&ndrxpgsw, xid, rmid, flags);
0814 }
0815 exprivate int xa_rollback_entry_stat(XID *xid, int rmid, long flags)
0816 {
0817     return xa_rollback_entry(&ndrxpgsw, xid, rmid, flags);
0818 }
0819 exprivate int xa_prepare_entry_stat(XID *xid, int rmid, long flags)
0820 {
0821     return xa_prepare_entry(&ndrxpgsw, xid, rmid, flags);
0822 }
0823 
0824 exprivate int xa_commit_entry_stat(XID *xid, int rmid, long flags)
0825 {
0826     return xa_commit_entry(&ndrxpgsw, xid, rmid, flags);
0827 }
0828 
0829 exprivate int xa_recover_entry_stat(XID *xid, long count, int rmid, long flags)
0830 {
0831     return xa_recover_entry(&ndrxpgsw, xid, count, rmid, flags);
0832 }
0833 exprivate int xa_forget_entry_stat(XID *xid, int rmid, long flags)
0834 {
0835     return xa_forget_entry(&ndrxpgsw, xid, rmid, flags);
0836 }
0837 exprivate int xa_complete_entry_stat(int *handle, int *retval, int rmid, long flags)
0838 {
0839     return xa_complete_entry(&ndrxpgsw, handle, retval, rmid, flags);
0840 }
0841 
0842 
0843 /* vim: set ts=4 sw=4 et smartindent: */