Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Event Broker services
0003  *   TODO: We should send only once to services with the same name!!!
0004  *   for each broadcast, if matched, we shall put the made call in hashlist and
0005  *   and next time check was there broadcast or not.
0006  *
0007  * @file tpevsv.c
0008  */
0009 /* -----------------------------------------------------------------------------
0010  * Enduro/X Middleware Platform for Distributed Transaction Processing
0011  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0012  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0013  * This software is released under one of the following licenses:
0014  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0015  * See LICENSE file for full text.
0016  * -----------------------------------------------------------------------------
0017  * AGPL license:
0018  *
0019  * This program is free software; you can redistribute it and/or modify it under
0020  * the terms of the GNU Affero General Public License, version 3 as published
0021  * by the Free Software Foundation;
0022  *
0023  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0024  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0025  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0026  * for more details.
0027  *
0028  * You should have received a copy of the GNU Affero General Public License along 
0029  * with this program; if not, write to the Free Software Foundation, Inc.,
0030  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0031  *
0032  * -----------------------------------------------------------------------------
0033  * A commercial use license is available from Mavimax, Ltd
0034  * contact@mavimax.com
0035  * -----------------------------------------------------------------------------
0036  */
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043 
0044 #include <ndebug.h>
0045 #include <atmi.h>
0046 #include <sys_unix.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <atmi_shm.h>
0053 #include <exregex.h>
0054 #include "tpevsv.h"
0055 /*---------------------------Externs------------------------------------*/
0056 /*---------------------------Macros-------------------------------------*/
0057 /*---------------------------Enums--------------------------------------*/
0058 /*---------------------------Typedefs-----------------------------------*/
0059 /*---------------------------Globals------------------------------------*/
0060 exprivate event_entry_t *M_subscribers=NULL;
0061 
0062 /** allow MT read, single thread write */
0063 exprivate NDRX_RWLOCK_DECL(M_subscribers_lock);
0064 /*---------------------------Statics------------------------------------*/
0065 /*---------------------------Prototypes---------------------------------*/
0066 
0067 /**
0068  * Remove subscriber
0069  * @param subscription
0070  * @param my_id
0071  * @return
0072  */
0073 exprivate long remove_by_my_id (long subscription, char *my_id)
0074 {
0075     event_entry_t *elt, *tmp;
0076     long deleted  = 0;
0077 
0078     /* Delete the stuff out */
0079     DL_FOREACH_SAFE(M_subscribers,elt,tmp)
0080     {
0081         NDRX_LOG(log_debug, "Checking Nr: %d, my_id: %s",
0082                                 elt->subscriberNr, elt->my_id);
0083 
0084         if ((-1==subscription &&
0085                 !(elt->flags & TPEVPERSIST) &&
0086                 (NULL==my_id || (0==strcmp(elt->my_id, my_id)))
0087              ) ||
0088              subscription==elt->subscriberNr
0089             )
0090         {
0091             NDRX_LOG(log_debug, "Removing subscription %ld", subscription);
0092             /* Un-initalize  */
0093             ndrx_regfree(&elt->re);
0094             /* Delete out it from list */
0095             DL_DELETE(M_subscribers,elt);
0096             NDRX_FPFREE(elt);
0097             deleted++;
0098         }
0099 
0100     }
0101 
0102     return deleted;
0103 }
0104 
0105 /**
0106  * Compile the regular expression rules
0107  * @param regex entry
0108  * @return SUCCEED/FAIL
0109  */
0110 exprivate int compile_eventexpr(event_entry_t *p_ee)
0111 {
0112     return ndrx_regcomp(&(p_ee->re), p_ee->eventexpr);
0113 }
0114 
0115 /**
0116  * Do dispatch over bridges is required or not??
0117  * @param p_svc
0118  * @param dispatch_over_bridges
0119  */
0120 exprivate void process_postage(TPSVCINFO *p_svc, int dispatch_over_bridges)
0121 {
0122     int ret=EXSUCCEED;
0123     char *data = p_svc->data;
0124     event_entry_t *elt, *tmp;
0125     long numdisp = 0;
0126     char tmpsvc[MAXTIDENT+1];
0127     char buf_type[9];
0128     char buf_subtype[17];
0129     long buf_len;
0130     long flags;
0131     int locked = EXFALSE;
0132     tp_command_call_t * last_call;
0133     buffer_obj_t *bo;
0134     
0135     /* Support #279 */
0136     string_hash_t *dup_chk = NULL;
0137     
0138     memset(buf_type, 0, sizeof(buf_type));
0139     memset(buf_subtype, 0, sizeof(buf_subtype));
0140     
0141     NDRX_LOG(log_debug, "process_postage got call");
0142     
0143     if (NULL!=data)
0144     {
0145         buf_len = tptypes(data, buf_type, buf_subtype);
0146         
0147         if (strcmp(buf_type, BUF_TYPE_UBF_STR) && 
0148                 debug_get_ndrx_level() > log_debug)
0149         {
0150             Bfprint((UBFH *)data, stderr);
0151         }
0152     }
0153     
0154     last_call=ndrx_get_G_last_call();
0155 
0156     NDRX_LOG(log_debug, "Posting event [%s] to system", last_call->extradata);
0157     
0158     /* Delete the stuff out */
0159     
0160     /* Lock the dispatch... */
0161     NDRX_RWLOCK_RLOCK_V(M_subscribers_lock);
0162     locked=EXTRUE;
0163     
0164     DL_FOREACH_SAFE(M_subscribers,elt,tmp)
0165     {
0166         /* Get type */
0167         typed_buffer_descr_t *descr;
0168         
0169         /* this must be in place as otherwise service would not receive any data*/
0170         bo = ndrx_find_buffer(p_svc->data);
0171         
0172         descr = &G_buf_descr[bo->type_id];
0173 
0174         NDRX_LOG(log_debug, "Checking Nr: %d, event [%s]",
0175                                 elt->subscriberNr, elt->eventexpr);
0176 
0177         /* Check do we have event match? */
0178         if (EXSUCCEED==regexec(&elt->re, last_call->extradata, (size_t) 0, NULL, 0))
0179         {
0180             NDRX_LOG(log_debug, "Event matched");
0181             
0182             /* Check do we have special rules for this? */
0183             if (EXEOS!=elt->filter[0])
0184             {
0185                 NDRX_LOG(log_debug, "Using filter: [%s]", elt->filter);
0186             }
0187 
0188             /* If filter matched, or no filter, then call the service */
0189             if ((EXEOS!=elt->filter[0] && 
0190                        descr->pf_test(descr, p_svc->data, p_svc->len, elt->filter)) ||
0191                 EXEOS==elt->filter[0])
0192             {
0193                 NDRX_LOG(log_debug, "Dispatching event");
0194                 if (elt->flags & TPEVSERVICE)
0195                 {
0196                     int err;
0197                     NDRX_LOG(log_debug, "Calling service %s/%s in async mode",
0198                                                     elt->name1, elt->my_id);
0199 
0200                     /* todo: Call in async: Do we need to pass there original flags? */
0201                     
0202                     /* Support #279: check for duplicate */
0203                     if (ndrx_string_hash_get(dup_chk, elt->name1))
0204                     {
0205                         NDRX_LOG(log_debug, "Service already called: [%s] - skip dup",
0206                                 elt->name1);
0207                         continue; /* <<<<<<<<<<<<<<< CONTINUE! */
0208                     }
0209                     
0210                     flags = p_svc->flags | TPNOREPLY;
0211                     NDRX_LOG(log_debug, "Calling service %s/%s in async mode flags: 0x%lx (2)",
0212                                                     elt->name1, elt->my_id, flags);
0213                     
0214                     if (EXFAIL==(err=tpacallex (elt->name1, p_svc->data, p_svc->len, 
0215                                     flags, last_call->extradata, 
0216                                     EXFAIL, EXTRUE,
0217                                     /* Pass user data in request via these rsp fields */
0218                                     last_call->rval, last_call->rcode, 
0219                                     last_call->user3, last_call->user4)))
0220                     {
0221                         if (tperrno!=TPEBLOCK)
0222                         {
0223                             NDRX_LOG(log_error, "Failed to call service [%s/%s]: %s"
0224                                     " - unsubscribing %ld",
0225                                     elt->name1, elt->my_id, 
0226                                     tpstrerror(tperrno), elt->subscriberNr);
0227                             
0228                             /* IF NO ENT, THEN UNSUBSCIRBE!!! */
0229                             
0230                             /* Switch to write lock */
0231                             NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0232                             NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0233                             
0234                             remove_by_my_id(elt->subscriberNr, NULL);
0235                         }
0236                         else
0237                         {
0238                             NDRX_LOG(log_error, "TPEBLOCK during call "
0239                                     "of service [%s/%s] subscr: %ld - skip",
0240                                     elt->name1, elt->my_id, elt->subscriberNr);
0241                         }
0242                     }
0243                     else
0244                     {
0245                         /* Add to hash */
0246                         if (NULL==ndrx_string_hash_add(&dup_chk, elt->name1))
0247                         {
0248                             NDRX_LOG(log_error, "Failed to add service [%s] to "
0249                                     "dup hash list!", elt->name1);
0250                             EXFAIL_OUT(ret);
0251                         }
0252                         
0253                         numdisp++;
0254             /* free up connection descriptor */
0255                         if (err)
0256                         {
0257                             tpcancel(err);
0258                         }
0259                     }
0260                 }
0261                 else
0262                 {
0263                     NDRX_LOG(log_debug, "Skipping subscriber due to "
0264                                         "unsupported event delivery mechanism!");
0265                 }
0266             }
0267             else
0268             {
0269                 NDRX_LOG(log_debug, "Not dispatching event due to filter");
0270             }
0271         }
0272     }
0273     
0274     NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0275     locked=EXFALSE;
0276     
0277     if (dispatch_over_bridges)
0278     {
0279         char nodes[CONF_NDRX_NODEID_COUNT+1] = {EXEOS};
0280         int i = 0;
0281         long olen;
0282         NDRX_LOG(log_debug, "Dispatching events over the bridges...!");
0283         if (EXSUCCEED==ndrx_shm_birdge_getnodesconnected(nodes))
0284         {
0285             while (nodes[i])
0286             {
0287                 int nodeid = nodes[i];
0288                 char *tmp_data = NULL;
0289                 
0290                 if (NULL!=tmp_data)
0291                 {
0292                     if (buf_len < 0)
0293                     {
0294                         NDRX_LOG(log_error, "Invalid buffer type!");
0295                         break;
0296                     }
0297 
0298                     tmp_data = tpalloc(buf_type, 
0299                             (buf_subtype[0]==EXEOS?NULL:buf_subtype), buf_len);
0300                 }
0301                 else
0302                 {
0303                     /* Ensure that we have output buffer: */
0304                     tmp_data = tpalloc(BUF_TYPE_UBF_STR, NULL, 1024);
0305                 }
0306                 
0307                 if (NULL==tmp_data)
0308                 {
0309                     NDRX_LOG(log_error, "Cannot deliver event to node %c"
0310                             " - tpalloc failed for dest buffer...");
0311                     break;
0312                 }
0313                 
0314                 /* make dopost service */
0315                 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVDOPOST, 
0316                         (short)nodeid);
0317                 
0318                 /* we want some reply back... - Support #527*/
0319                 flags = (p_svc->flags & ~TPNOREPLY);
0320                 
0321                 if (EXFAIL==(tpcallex (tmpsvc, p_svc->data, p_svc->len,  
0322                         &tmp_data, &olen,
0323                         flags, last_call->extradata, nodeid, TPCALL_BRCALL, 
0324                         /* we re-use for requests rval as user1 and rcode as user2 */
0325                         last_call->rval, last_call->rcode,
0326                         last_call->user3, last_call->user4)))
0327                 {
0328                     NDRX_LOG(log_error, "Call bridge %d: [%s]: %s",
0329                                     nodeid, tmpsvc,  tpstrerror(tperrno));
0330                 }
0331                 else
0332                 {
0333                     NDRX_LOG(log_debug, "On node %d applied %d events", 
0334                             nodeid, tpurcode);
0335                     numdisp+=tpurcode;
0336                 }
0337                 
0338                 if (tmp_data)
0339                 {
0340                     tpfree(tmp_data);
0341                 }
0342                 
0343                 i++;
0344             }
0345         }
0346     }
0347     
0348 out:
0349                                 
0350     if (locked) 
0351     {
0352         NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0353     }
0354 
0355     if (NULL!=dup_chk)
0356     {
0357         ndrx_string_hash_free(dup_chk);
0358     }
0359                                 
0360     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0361                 numdisp,
0362                 NULL,
0363                 0L,
0364                 0L);
0365 }
0366 
0367 /* TODO: We should have some stuff here. 
0368  * Function if it gets direct call, then dispatch it over the bridges
0369  * and then process locally.
0370  */
0371 
0372 /**
0373  * Do the actual posting here...
0374  * This entry point for processing or entry point for bridged calls.
0375  * @param p_svc
0376  */
0377 void TPEVDOPOST(TPSVCINFO *p_svc)
0378 {
0379     process_postage(p_svc, EXFALSE);
0380 }
0381 
0382 /**
0383  * Post the event to subscribers
0384  * Event name carried in extradata of tpcallex()
0385  * This is entry point for clients/servers doing postage.
0386  * --------------------------------
0387  *
0388  * @param p_svc
0389  */
0390 void TPEVPOST (TPSVCINFO *p_svc)
0391 {
0392     /* We dispatch calls over the all bridges! */
0393     process_postage(p_svc, EXTRUE);
0394 }
0395 
0396 /**
0397  * Subscribe to event
0398  * EV_SUBSNR - subscriberNr
0399  * --------------------------------
0400  *
0401  * @param p_svc
0402  */
0403 void TPEVUNSUBS (TPSVCINFO *p_svc)
0404 {
0405     int ret=EXSUCCEED;
0406     UBFH *p_ub = (UBFH *)p_svc->data;
0407     long subscriberNr = 0;
0408     long deleted = 0;
0409     
0410     NDRX_LOG(log_debug, "EX_EVUNSUBS got call");
0411     Bfprint(p_ub, stderr);
0412 
0413 
0414     /* This field must be here! */
0415     if (EXFAIL==CBget(p_ub, EV_SUBSNR, 0, (char *)&subscriberNr, NULL, BFLD_LONG))
0416     {
0417         NDRX_LOG(log_error, "Failed to get EV_SUBSNR/subscriberNr: %s",
0418                                         Bstrerror(Berror));
0419         ret=EXFAIL;
0420         goto out;
0421     }
0422     NDRX_LOG(log_debug, "About to remove subscription: %ld, my_id: %s",
0423                                         subscriberNr, ndrx_get_G_last_call()->my_id);
0424    /* Delete the subscriptions */
0425    NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0426    deleted=remove_by_my_id(subscriberNr, ndrx_get_G_last_call()->my_id);
0427    NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0428     
0429 out:
0430     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0431                 deleted,
0432                 NULL,
0433                 0L,
0434                 0L);
0435 }
0436 
0437 /**
0438  * Subscribe to event
0439  * EV_MASK - event mask (NDRX_EVENT_EXPR_MAX)
0440  * EV_FILTER - filter (NDRX_EVENT_EXPR_MAX)
0441  * EV_FLAGS - flags
0442  * -- Part of TPEVCTL --
0443  * EV_SRVCNM - name1 (service name)
0444  * --------------------------------
0445  * 
0446  * @param p_svc
0447  */
0448 void TPEVSUBS (TPSVCINFO *p_svc)
0449 {
0450     int ret=EXSUCCEED;
0451     UBFH *p_ub = (UBFH *)p_svc->data;
0452     event_entry_t *p_ee;
0453     BFLDLEN len;
0454     static long subscriberNr = 0;
0455     
0456     NDRX_LOG(log_debug, "TPEVSUBS got call");
0457     Bfprint(p_ub, stderr);
0458     
0459     if (NULL==(p_ee=NDRX_FPMALLOC(sizeof(event_entry_t), 0)))
0460     {
0461         NDRX_LOG(log_error, "Failed to allocate %d bytes: %s!",
0462                                         sizeof(event_entry_t), strerror(errno));
0463         ret=EXFAIL;
0464         goto out;
0465     }
0466 
0467     memset((char *)p_ee, 0, sizeof(event_entry_t));
0468 
0469     NDRX_STRCPY_SAFE(p_ee->my_id, ndrx_get_G_last_call()->my_id);
0470     len=sizeof(p_ee->eventexpr);
0471     if (Bpres(p_ub, EV_MASK, 0) && EXFAIL==Bget(p_ub, EV_MASK, 0,
0472                             p_ee->eventexpr, &len))
0473     {
0474         NDRX_LOG(log_error, "Failed to get EV_MASK/eventexpr: %s",
0475                                         Bstrerror(Berror));
0476         ret=EXFAIL;
0477         goto out;
0478     }
0479 
0480     len=sizeof(p_ee->filter);
0481     if (Bpres(p_ub, EV_FILTER, 0) && EXFAIL==Bget(p_ub, EV_FILTER, 0,
0482                             p_ee->filter, &len))
0483     {
0484         NDRX_LOG(log_error, "Failed to get EV_FILTER/filter: %s",
0485                                         Bstrerror(Berror));
0486         ret=EXFAIL;
0487         goto out;
0488     }
0489     
0490     /* This field must be here! */
0491     if (EXFAIL==CBget(p_ub, EV_FLAGS, 0, (char *)&p_ee->flags, NULL, BFLD_LONG))
0492     {
0493         NDRX_LOG(log_error, "Failed to get EV_FLAGS/flags: %s",
0494                                         Bstrerror(Berror));
0495         ret=EXFAIL;
0496         goto out;
0497     }
0498     
0499     /* check do we support the call? */
0500     if (p_ee->flags & TPEVSERVICE)
0501     {
0502         len=sizeof(p_ee->name1);
0503         if (EXSUCCEED!=CBget(p_ub, EV_SRVCNM, 0, p_ee->name1, &len, BFLD_STRING))
0504         {
0505             NDRX_LOG(log_error, "Failed to get EV_SRVCNM/name1: %s",
0506                                             Bstrerror(Berror));
0507             ret=EXFAIL;
0508             goto out;
0509         }
0510     }
0511     else
0512     {
0513         NDRX_LOG(log_error, "tpsubscribe() unsupported flags: %ld",
0514                                         p_ee->flags);
0515         ret=EXFAIL;
0516         goto out;
0517     }
0518 
0519     /* Compile the regular expression */
0520     if (EXSUCCEED!=compile_eventexpr(p_ee))
0521     {
0522         ret=EXFAIL;
0523         goto out;
0524     }
0525     p_ee->subscriberNr=subscriberNr; /* start with 0 */
0526 
0527     /* Dump the key info */
0528     NDRX_LOG(log_debug, "Nr: %ld, event [%s], filter [%s], name1 [%s], flags [%ld]",
0529                             p_ee->subscriberNr, p_ee->eventexpr, p_ee->filter,
0530                             p_ee->name1, p_ee->flags);
0531     subscriberNr++; /* Increase subscription's ID for next user */
0532 
0533     /* Register the subscriber */
0534     NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0535     DL_APPEND(M_subscribers, p_ee);
0536     NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0537 out:
0538     tpreturn(  ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0539                 p_ee->subscriberNr,
0540                 NULL,
0541                 0L,
0542                 0L);
0543 }
0544 
0545 /*
0546  * Do initialization
0547  */
0548 int tpsvrinit(int argc, char **argv)
0549 {
0550     int ret=EXSUCCEED;
0551     short nodeid = (short)tpgetnodeid();
0552     char tmpsvc[MAXTIDENT+1];
0553     
0554     NDRX_LOG(log_debug, "tpsvrinit called");
0555     
0556     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVSUBS, nodeid);
0557     if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVSUBS))
0558     {
0559         NDRX_LOG(log_error, "Failed to initialize TPEVSUBS!");
0560         ret=EXFAIL;
0561         goto out;
0562     }
0563 
0564     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVUNSUBS, nodeid);
0565     if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVUNSUBS))
0566     {
0567         NDRX_LOG(log_error, "Failed to initialize TPEVUNSUBS!");
0568         ret=EXFAIL;
0569         goto out;
0570     }
0571 
0572     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVPOST, nodeid);
0573     if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVPOST))
0574     {
0575         NDRX_LOG(log_error, "Failed to initialize TPEVPOST!");
0576         ret=EXFAIL;
0577         goto out;
0578     }
0579     
0580     snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVDOPOST, nodeid);
0581     if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVDOPOST))
0582     {
0583         NDRX_LOG(log_error, "Failed to initialize TPEVDOPOST!");
0584         ret=EXFAIL;
0585         goto out;
0586     }
0587 
0588 out:
0589     return ret;
0590 }
0591 
0592 void tpsvrdone (void)
0593 {
0594 
0595 }
0596 
0597 /* Auto generated system advertise table */
0598 expublic struct tmdsptchtbl_t ndrx_G_tmdsptchtbl[] = {
0599     { NULL, NULL, NULL, 0, 0 }
0600 };
0601 
0602 /**
0603  * Main entry for event server, allow mult-threading
0604  */
0605 int main( int argc, char** argv )
0606 {
0607     _tmbuilt_with_thread_option=EXTRUE;
0608     struct tmsvrargs_t tmsvrargs =
0609     {
0610         &tmnull_switch,
0611         &ndrx_G_tmdsptchtbl[0],
0612         0,
0613         tpsvrinit,
0614         tpsvrdone,
0615         NULL,
0616         NULL,
0617         NULL,
0618         NULL,
0619         NULL,
0620         NULL,
0621         NULL
0622     };
0623     
0624     return( _tmstartserver( argc, argv, &tmsvrargs ));
0625     
0626 }
0627 
0628 /* vim: set ts=4 sw=4 et smartindent: */