Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief API functions for Events:
0003  *   - tpsubscribe
0004  *   - tpunsubscribe
0005  *   - tppost
0006  *
0007  * @file tpevents.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <string.h>
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <memory.h>
0041 
0042 #include <ubf.h>
0043 #include <atmi.h>
0044 #include <atmi_int.h>
0045 #include <ndebug.h>
0046 #include <ndrstandard.h>
0047 #include <Exfields.h>
0048 #include <tperror.h>
0049 /*---------------------------Externs------------------------------------*/
0050 /*---------------------------Macros-------------------------------------*/
0051 /*---------------------------Enums--------------------------------------*/
0052 /*---------------------------Typedefs-----------------------------------*/
0053 /*---------------------------Globals------------------------------------*/
0054 /*---------------------------Statics------------------------------------*/
0055 /*---------------------------Prototypes---------------------------------*/
0056 
0057 
0058 /**
0059  * Internal version of tpsubscribe.
0060  * Actually do the main logic of the function
0061  */
0062 expublic long ndrx_tpsubscribe(char *eventexpr, char *filter, TPEVCTL *ctl, long flags)
0063 {
0064     long ret=EXSUCCEED;
0065     UBFH *p_ub = NULL;
0066     char *ret_buf = NULL;
0067     long ret_len;
0068     short nodeid = (short)tpgetnodeid();
0069     char tmpsvc[MAXTIDENT+1];
0070     
0071     NDRX_LOG(log_debug, "%s enter", __func__);
0072 
0073     if (NULL==eventexpr || EXEOS==eventexpr[0])
0074     {
0075         ndrx_TPset_error_fmt(TPEINVAL, "eventexpr cannot be null/empty!");
0076         ret=EXFAIL;
0077         goto out;
0078     }
0079     /* Check the lenght */
0080     if (strlen(eventexpr)>255)
0081     {
0082         ndrx_TPset_error_fmt(TPEINVAL, "eventexpre longer than 255 bytes!");
0083         ret=EXFAIL;
0084         goto out;
0085     }
0086 
0087     if (NULL==ctl)
0088     {
0089         ndrx_TPset_error_fmt(TPEINVAL, "ctl cannot be null/empty!");
0090         ret=EXFAIL;
0091         goto out;
0092     }
0093 
0094     if (EXEOS==ctl->name1[0])
0095     {
0096         ndrx_TPset_error_fmt(TPEINVAL, "ctl->name1 cannot be null!");
0097         ret=EXFAIL;
0098         goto out;
0099     }
0100 
0101     /* All ok, we can now allocate the buffer... */
0102     if (NULL==(p_ub = (UBFH *)tpalloc("UBF", NULL, 1024)))
0103     {
0104         NDRX_LOG(log_error, "%s: failed to allocate 1024", __func__);
0105         ret=EXFAIL;
0106         goto out;
0107     }
0108 
0109     /* Set up paramters */
0110     if (EXFAIL==Badd(p_ub, EV_MASK, (char *)eventexpr, 0L))
0111     {
0112         ndrx_TPset_error_fmt(TPESYSTEM, "Failed to set EV_MASK/eventexpr: [%s]", Bstrerror(Berror));
0113         ret=EXFAIL;
0114         goto out;
0115     }
0116 
0117     /* Check the len */
0118     if (NULL!=filter && EXEOS!=filter[0] && strlen(filter)>255)
0119     {
0120         ndrx_TPset_error_fmt(TPEINVAL, "filter longer than 255 bytes!");
0121         ret=EXFAIL;
0122     }
0123 
0124     /* setup filter argument is set */
0125     if (NULL!=filter && EXEOS!=filter[0] &&
0126             EXFAIL==Badd(p_ub, EV_FILTER, filter, 0L))
0127     {
0128         ndrx_TPset_error_fmt(TPESYSTEM, "Failed to set EV_FILTER/filter: [%s]",
0129                                             Bstrerror(Berror));
0130         ret=EXFAIL;
0131         goto out;
0132     }
0133 
0134 
0135     if (EXFAIL==CBadd(p_ub, EV_FLAGS, (char *)&ctl->flags, 0L, BFLD_LONG))
0136     {
0137         ndrx_TPset_error_fmt(TPESYSTEM, "Failed to set EV_FLAGS/flags: [%s]",
0138                                             Bstrerror(Berror));
0139         ret=EXFAIL;
0140         goto out;
0141     }
0142 
0143     if (EXFAIL==CBadd(p_ub, EV_SRVCNM, ctl->name1, 0L, BFLD_STRING))
0144     {
0145         ndrx_TPset_error_fmt(TPESYSTEM, "Failed to set EV_SRVCNM/name1: [%s]",
0146                                             Bstrerror(Berror));
0147         ret=EXFAIL;
0148         goto out;
0149     }
0150     
0151     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVSUBS, nodeid);
0152     
0153     if (EXFAIL!=(ret=ndrx_tpcall(tmpsvc, (char *)p_ub, 0L, &ret_buf, &ret_len, 
0154             flags|TPNOTRAN, NULL, 0, 0, 0, 0, 0, 0)))
0155     {
0156         ret=tpurcode; /* Return code - count of events applied */
0157     }
0158 
0159 out:
0160     /* free up any allocated resources */
0161     if (NULL!=p_ub)
0162     {
0163         atmi_error_t err;
0164         /* Save the original error/needed later! */
0165         ndrx_TPsave_error(&err);
0166         tpfree((char*)p_ub);
0167         ndrx_TPrestore_error(&err);
0168     }
0169 
0170     NDRX_LOG(log_debug, "%s returns %ld", __func__, ret);
0171     
0172     return ret;
0173 }
0174 
0175 /**
0176  * Internal version of tpunsubscribe.
0177  * Actually do the main logic of the function
0178  */
0179 expublic long ndrx_tpunsubscribe(long subscription, long flags)
0180 {
0181     long ret=EXSUCCEED;
0182     UBFH *p_ub = NULL;
0183     char *ret_buf = NULL;
0184     long ret_len;
0185     short nodeid = (short)tpgetnodeid();
0186     char tmpsvc[MAXTIDENT+1];
0187 
0188     NDRX_LOG(log_debug, "%s enter", __func__);
0189 
0190     /* All ok, we can now allocate the buffer... */
0191     if (NULL==(p_ub = (UBFH *)tpalloc("UBF", NULL, 512)))
0192     {
0193         NDRX_LOG(log_error, "%s: failed to allocate 512", __func__);
0194         ret=EXFAIL;
0195         goto out;
0196     }
0197 
0198     /* Check the validity of arguments */
0199     if (subscription<-1)
0200     {
0201         ndrx_TPset_error_fmt(TPEINVAL, "%s: subscription %ld cannot be < -1",
0202                                      __func__, subscription);
0203         ret=EXFAIL;
0204         goto out;
0205     }
0206     
0207     if (EXFAIL==CBadd(p_ub, EV_SUBSNR, (char *)&subscription, 0L, BFLD_LONG))
0208     {
0209         ndrx_TPset_error_fmt(TPESYSTEM, "Failed to set EV_SUBSNR/flags: [%s]",
0210                                             Bstrerror(Berror));
0211         ret=EXFAIL;
0212         goto out;
0213     }
0214 
0215     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVUNSUBS, nodeid);
0216     if (EXFAIL!=(ret=ndrx_tpcall(tmpsvc, (char *)p_ub, 0L, 
0217             &ret_buf, &ret_len, flags|TPNOTRAN, NULL, 0, 0, 0, 0, 0, 0)))
0218     {
0219         ret=tpurcode; /* Return code - count of events applied */
0220     }
0221 
0222 out:
0223     /* free up any allocated resources */
0224     if (NULL!=p_ub)
0225     {
0226         atmi_error_t err;
0227         /* Save the original error/needed later! */
0228         ndrx_TPsave_error(&err);
0229         tpfree((char*)p_ub);
0230         ndrx_TPrestore_error(&err);
0231     }
0232 
0233     NDRX_LOG(log_debug, "%s returns %ld", __func__, ret);
0234     return ret;
0235 }
0236 
0237 /**
0238  * Internal version of tppost
0239  * @param eventname
0240  * @param data
0241  * @param len
0242  * @param flags
0243  * @param user1 user data field 1
0244  * @param user2 user data field 2
0245  * @return
0246  */
0247 expublic int ndrx_tppost(char *eventname, char *data, long len, long flags,
0248             int user1, long user2, int user3, long user4)
0249 {
0250     int ret=EXSUCCEED;
0251     char *ret_buf = NULL; /* This works as tpalloc'd NULL buffer! */
0252     long ret_len;
0253     short nodeid = (short)tpgetnodeid();
0254     char tmpsvc[MAXTIDENT+1];
0255     
0256     NDRX_LOG(log_debug, "%s enter", __func__);
0257 
0258     if (NULL==eventname || EXEOS==eventname[0])
0259     {
0260         ndrx_TPset_error_fmt(TPEINVAL, "%s: eventname cannot be null/empty", __func__);
0261         ret=EXFAIL;
0262         goto out;
0263     }
0264 
0265     /* Post the */
0266     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVPOST, nodeid);
0267     if (EXFAIL!=(ret=ndrx_tpcall(tmpsvc, data, len, &ret_buf, &ret_len, flags, 
0268             eventname, EXFAIL, 0, user1, user2, user3, user4)))
0269     {
0270         ret=tpurcode; /* Return code - count of events applied */
0271     }
0272 
0273 out:
0274 
0275     NDRX_LOG(log_debug, "%s returns %d", __func__, ret);
0276     return ret;
0277 }
0278 /* vim: set ts=4 sw=4 et smartindent: */