Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Move message from one qspace to another qspace + qname
0003  *   uses local XA connection
0004  *
0005  * @file cmd_mqmv.c
0006  */
0007 /* -----------------------------------------------------------------------------
0008  * Enduro/X Middleware Platform for Distributed Transaction Processing
0009  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0010  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0011  * This software is released under one of the following licenses:
0012  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0013  * See LICENSE file for full text.
0014  * -----------------------------------------------------------------------------
0015  * AGPL license:
0016  *
0017  * This program is free software; you can redistribute it and/or modify it under
0018  * the terms of the GNU Affero General Public License, version 3 as published
0019  * by the Free Software Foundation;
0020  *
0021  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0022  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0023  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0024  * for more details.
0025  *
0026  * You should have received a copy of the GNU Affero General Public License along 
0027  * with this program; if not, write to the Free Software Foundation, Inc.,
0028  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0029  *
0030  * -----------------------------------------------------------------------------
0031  * A commercial use license is available from Mavimax, Ltd
0032  * contact@mavimax.com
0033  * -----------------------------------------------------------------------------
0034  */
0035 #include <string.h>
0036 #include <stdio.h>
0037 #include <stdlib.h>
0038 #include <memory.h>
0039 #include <sys/param.h>
0040 #include <errno.h>
0041 
0042 #include <ndrstandard.h>
0043 #include <ndebug.h>
0044 #include <nstdutil.h>
0045 
0046 #include <ndrxdcmn.h>
0047 #include <atmi_int.h>
0048 #include <gencall.h>
0049 #include <utlist.h>
0050 #include <Exfields.h>
0051 
0052 #include "xa_cmn.h"
0053 #include <ndrx.h>
0054 #include <qcommon.h>
0055 #include <nclopt.h>
0056 /*---------------------------Externs------------------------------------*/
0057 /*---------------------------Macros-------------------------------------*/
0058 /*---------------------------Enums--------------------------------------*/
0059 /*---------------------------Typedefs-----------------------------------*/
0060 /*---------------------------Globals------------------------------------*/
0061 /*---------------------------Statics------------------------------------*/
0062 /*---------------------------Prototypes---------------------------------*/
0063 
0064 /**
0065  * Move message to different queue. Also note that caller (xadmin) must be
0066  * be able to work with XA transaction.
0067  * @param p_cmd_map
0068  * @param argc
0069  * @param argv
0070  * @return SUCCEED
0071  */
0072 expublic int cmd_mqmv(cmd_mapping_t *p_cmd_map, int argc, char **argv, int *p_have_next)
0073 {
0074     int ret = EXSUCCEED;
0075     short srvid;
0076     short nodeid;
0077     char msgid_str[TMMSGIDLEN_STR+1];
0078     char msgid[TMMSGIDLEN+1];
0079     char *buf = NULL;
0080     TPQCTL qc;
0081     long len = 1;
0082     char dest_qspace[XATMI_SERVICE_NAME_LENGTH+1];
0083     char dest_qname[TMQNAMELEN+1];
0084     
0085     ncloptmap_t clopt[] =
0086     {
0087         {'n', BFLD_SHORT, (char *)&nodeid, 0,
0088                                 NCLOPT_MAND|NCLOPT_HAVE_VALUE, "Q cluster node id"},
0089         {'i', BFLD_SHORT, (char *)&srvid, 0, 
0090                                 NCLOPT_MAND|NCLOPT_HAVE_VALUE, "Q Server Id"},
0091         {'m', BFLD_STRING, msgid_str, sizeof(msgid_str), 
0092                                 NCLOPT_MAND|NCLOPT_HAVE_VALUE, "Message id"},
0093         {'s', BFLD_STRING, dest_qspace, sizeof(dest_qspace), 
0094                                 NCLOPT_MAND|NCLOPT_HAVE_VALUE, "Dest Q space"},
0095         {'q', BFLD_STRING, dest_qname, sizeof(dest_qname), 
0096                                 NCLOPT_MAND|NCLOPT_HAVE_VALUE, "Dest Q"},
0097         {0}
0098     };
0099     
0100     /* parse command line */
0101     if (nstd_parse_clopt(clopt, EXTRUE,  argc, argv, EXFALSE))
0102     {
0103         fprintf(stderr, XADMIN_INVALID_OPTIONS_MSG);
0104         EXFAIL_OUT(ret);
0105     }
0106     
0107     /* Have a number in FB! */
0108     memset(&qc, 0, sizeof(qc));
0109 
0110     tmq_msgid_deserialize(msgid_str, msgid);
0111     
0112     qc.flags|= TPQGETBYMSGID;
0113     
0114     memcpy(qc.msgid, msgid, TMMSGIDLEN);
0115     
0116     /* we need to init TP subsystem... */
0117     if (EXSUCCEED!=tpinit(NULL))
0118     {
0119         fprintf(stderr, "Failed to tpinit(): %s\n", tpstrerror(tperrno));
0120         EXFAIL_OUT(ret);
0121     }
0122     
0123     /* Connect to XA */
0124     if (EXSUCCEED!=tpopen())
0125     {
0126         fprintf(stderr, "Failed to open XA sub-system: %s\n", tpstrerror(tperrno));
0127         EXFAIL_OUT(ret);
0128     }
0129     
0130     if (EXSUCCEED!=tpbegin(ndrx_get_G_atmi_env()->time_out, 0))
0131     {
0132         fprintf(stderr, "Failed to start XA transaction: %s\n", tpstrerror(tperrno));
0133         EXFAIL_OUT(ret);
0134     }
0135     
0136     
0137     buf = tpalloc("STRING", "", 1);
0138     
0139     if (EXSUCCEED!=tpdequeueex(nodeid, srvid, "*N/A*", &qc, (char **)&buf, &len, 0))
0140     {
0141         fprintf(stderr, "dequeue failed %s diag: %ld:%s\n", 
0142                 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0143         NDRX_LOG(log_error, "failed %s diag: %ld:%s", 
0144                 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0145         
0146         EXFAIL_OUT(ret);
0147     }
0148     
0149     if (EXSUCCEED!=tpenqueue(dest_qspace, dest_qname, &qc, (char *)buf, len, 0))
0150     {
0151         fprintf(stderr, "enqueue failed %s diag: %ld:%s\n", 
0152                 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0153         NDRX_LOG(log_error, "failed %s diag: %ld:%s", 
0154                 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0155         
0156         EXFAIL_OUT(ret);
0157     }
0158     
0159     if (EXSUCCEED!=tpcommit(0))
0160     {
0161         fprintf(stderr, "Commit failed: %s\n", tpstrerror(tperrno));
0162         EXFAIL_OUT(ret);
0163     }
0164     
0165     printf("Committed\n");
0166     
0167 out:
0168 
0169     if (tpgetlev())
0170     {
0171         tpabort(0);
0172     }
0173 
0174     if (NULL!=buf)
0175     {
0176         tpfree(buf);
0177     }
0178     
0179     /* close it anyway... */
0180     tpclose();
0181 
0182     return ret;
0183 }
0184 /* vim: set ts=4 sw=4 et smartindent: */