Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Queue forward processing
0003  *   We will have a separate thread pool for processing automatic queue .
0004  *   the main forward thread will periodically scan the Q and submit the jobs to
0005  *   the threads (if any will be free).
0006  *   During the shutdown we will issue for every pool thread
0007  *
0008  * @file forward.c
0009  */
0010 /* -----------------------------------------------------------------------------
0011  * Enduro/X Middleware Platform for Distributed Transaction Processing
0012  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0013  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0014  * This software is released under one of the following licenses:
0015  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0016  * See LICENSE file for full text.
0017  * -----------------------------------------------------------------------------
0018  * AGPL license:
0019  *
0020  * This program is free software; you can redistribute it and/or modify it under
0021  * the terms of the GNU Affero General Public License, version 3 as published
0022  * by the Free Software Foundation;
0023  *
0024  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0025  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0026  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0027  * for more details.
0028  *
0029  * You should have received a copy of the GNU Affero General Public License along 
0030  * with this program; if not, write to the Free Software Foundation, Inc.,
0031  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0032  *
0033  * -----------------------------------------------------------------------------
0034  * A commercial use license is available from Mavimax, Ltd
0035  * contact@mavimax.com
0036  * -----------------------------------------------------------------------------
0037  */
0038 #include <stdio.h>
0039 #include <stdlib.h>
0040 #include <string.h>
0041 #include <errno.h>
0042 #include <regex.h>
0043 #include <utlist.h>
0044 #include <dirent.h>
0045 #include <pthread.h>
0046 #include <signal.h>
0047 
0048 #include <ndebug.h>
0049 #include <atmi.h>
0050 #include <atmi_int.h>
0051 #include <typed_buf.h>
0052 #include <ndrstandard.h>
0053 #include <ubf.h>
0054 #include <Exfields.h>
0055 #include <tperror.h>
0056 #include <exnet.h>
0057 #include <ndrxdcmn.h>
0058 
0059 #include "tmqd.h"
0060 #include "../libatmisrv/srv_int.h"
0061 #include "nstdutil.h"
0062 #include "userlog.h"
0063 #include <xa_cmn.h>
0064 #include <atmi_int.h>
0065 #include <ndrxdiag.h>
0066 #include "qtran.h"
0067 #include <atmi_tls.h>
0068 #include <ndrx_ddr.h>
0069 #include <assert.h>
0070 /*---------------------------Externs------------------------------------*/
0071 /*---------------------------Macros-------------------------------------*/
0072 /*---------------------------Enums--------------------------------------*/
0073 /*---------------------------Typedefs-----------------------------------*/
0074 /*---------------------------Globals------------------------------------*/
0075 expublic pthread_t G_forward_thread;
0076 expublic int volatile G_forward_req_shutdown = EXFALSE;          /**< Is shutdown request? */
0077 expublic int volatile ndrx_G_forward_req_shutdown_ack = EXFALSE; /**< Is shutdown acked?   */
0078 
0079 
0080 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0081 exprivate pthread_cond_t M_wait_cond = PTHREAD_COND_INITIALIZER;
0082 
0083 exprivate __thread int M_is_xa_open = EXFALSE; /* Init flag for thread. */
0084 
0085 
0086 exprivate fwd_qlist_t *M_next_fwd_q_list = NULL;    /**< list of queues to check msgs to fwd */
0087 exprivate fwd_qlist_t *M_next_fwd_q_cur = NULL;     /**< current position in linked list...  */
0088 exprivate int           M_had_msg = EXFALSE;         /**< Did we got the msg previously?      */
0089 exprivate int          M_any_busy = EXFALSE;         /**< Is all queues busy?                 */
0090 exprivate int          M_num_busy = 0;         /**< Number of busy jobs                 */
0091     
0092 exprivate MUTEX_LOCKDECL(M_forward_lock); /* Q Forward operations sync        */
0093 
0094 exprivate int M_force_sleep = EXFALSE;              /**< Shall the sleep be forced in case of error? */
0095 
0096 
0097 /** we are into main sleep */
0098 expublic int volatile ndrx_G_fwd_into_sleep=EXFALSE;
0099 
0100 /** we are into pool sleep */
0101 expublic int volatile ndrx_G_fwd_into_poolsleep=EXFALSE;
0102 
0103 /** we want to wake up the forwarder as new job as arrived */
0104 expublic int volatile ndrx_G_fwd_force_wake = EXFALSE;
0105 
0106 /*---------------------------Statics------------------------------------*/
0107 /*---------------------------Prototypes---------------------------------*/
0108 
0109 /**
0110  * We assume we are locked.
0111  * and check run against time.
0112  * @param mmsg which was enqueued
0113  */
0114 expublic void ndrx_forward_chkrun(tmq_memmsg_t *mmsg)
0115 {
0116     fwd_stats_t *p_stats;
0117     
0118     /* nothing todo */
0119     if (G_tmqueue_cfg.no_chkrun)
0120     {
0121         return;
0122     }
0123     
0124     /* nothing todo, already triggered by other thread */
0125     if (ndrx_G_fwd_force_wake)
0126     {
0127         return;
0128     }
0129     
0130     /* nothing todo */
0131     if (!ndrx_G_fwd_into_sleep && !ndrx_G_fwd_into_poolsleep)
0132     {
0133         return;
0134     }
0135     
0136     if (G_forward_req_shutdown)
0137     {
0138         return;
0139     }
0140 
0141     /* if it's not auto, nothing to check & run */
0142     if (!TMQ_AUTOQ_ISAUTO(mmsg->qconf->autoq))
0143     {
0144         return;
0145     }
0146 
0147     /*
0148      * we can run the message, if it is in the current list
0149      * or in future, but, it's time has come.
0150      */
0151     if ( (mmsg->qstate & NDRX_TMQ_LOC_CURQ
0152 
0153             || mmsg->qstate & NDRX_TMQ_LOC_FUTQ && (mmsg->msg->qctl.flags &TPQTIME_ABS)
0154                 && mmsg->msg->qctl.deq_time <= time(NULL) )  
0155             /* Ignore error of cnt... */
0156             &&mmsg->qconf->workers > tmq_fwd_busy_cnt(mmsg->msg->hdr.qname, &p_stats))
0157     {
0158         
0159         ndrx_G_fwd_force_wake=EXTRUE;
0160         
0161         if (ndrx_G_fwd_into_sleep)
0162         {
0163             /* wakup from main sleep */
0164             pthread_cond_signal(&M_wait_cond);
0165         }
0166         else if (ndrx_G_fwd_into_poolsleep)
0167         {
0168             /* wakup from pool sleep */
0169             ndrx_thpool_signal_one(G_tmqueue_cfg.fwdthpool);
0170         }
0171     }
0172 }
0173 
0174 /**
0175  * Lock background operations
0176  */
0177 expublic void forward_lock(void)
0178 {
0179     MUTEX_LOCK_V(M_forward_lock);
0180 }
0181 
0182 /**
0183  * Un-lock background operations
0184  */
0185 expublic void forward_unlock(void)
0186 {
0187     MUTEX_UNLOCK_V(M_forward_lock);
0188 }
0189 
0190 /**
0191  * Sleep the thread, with option to wake up (by shutdown).
0192  * @param sleep_sec
0193  */
0194 exprivate void thread_sleep(int sleep_sec)
0195 {
0196     struct timespec wait_time;
0197     struct timeval now;
0198     int rt;
0199 
0200     gettimeofday(&now,NULL);
0201 
0202     wait_time.tv_sec = now.tv_sec+sleep_sec;
0203     wait_time.tv_nsec = now.tv_usec*1000;
0204 
0205     MUTEX_LOCK_V(M_wait_mutex);
0206     
0207     /* No wait if request was made here... */
0208     if (!G_forward_req_shutdown)
0209     {
0210         rt = pthread_cond_timedwait(&M_wait_cond, &M_wait_mutex, &wait_time);
0211     }
0212     
0213     MUTEX_UNLOCK_V(M_wait_mutex);
0214 }
0215 
0216 /**
0217  * Wake up the sleeping thread.
0218  */
0219 expublic void forward_shutdown_wake(void)
0220 {
0221     MUTEX_LOCK_V(M_wait_mutex);
0222     pthread_cond_signal(&M_wait_cond);
0223     MUTEX_UNLOCK_V(M_wait_mutex);
0224 }
0225 
0226 /**
0227  * Remove forward queue list before next lookup or at un-init
0228  */
0229 exprivate void fwd_q_list_rm(void)
0230 {
0231     fwd_qlist_t *elt, *tmp;
0232     /* Deallocate the previous DL */
0233     if (NULL!=M_next_fwd_q_list)
0234     {
0235         DL_FOREACH_SAFE(M_next_fwd_q_list,elt,tmp)
0236         {
0237             DL_DELETE(M_next_fwd_q_list,elt);
0238             NDRX_FREE(elt);
0239         }
0240     }
0241 }
0242 
0243 /**
0244  * Get next message to forward
0245  * So basically we iterate over the all Qs, then regenerate the Q list and
0246  * and iterate over again.
0247  * 
0248  * @return 
0249  */
0250 exprivate fwd_msg_t * get_next_msg(void)
0251 {
0252     fwd_msg_t * ret = NULL;
0253     tmq_msg_t * ret_msg = NULL;
0254     long qerr = EXSUCCEED;
0255     char msgbuf[128];
0256     int again;
0257     static unsigned long seq = 0;
0258 
0259     do
0260     {
0261         again=EXFALSE;
0262 
0263         if (NULL==M_next_fwd_q_list || NULL == M_next_fwd_q_cur)
0264         {
0265             /* reset marking, no messages processed yet. */
0266             M_had_msg=EXFALSE;
0267             M_any_busy=EXFALSE;
0268             M_num_busy = 0;
0269             fwd_q_list_rm();
0270 
0271             /* Generate new list */
0272             M_next_fwd_q_list = tmq_get_qlist(EXTRUE, EXFALSE);
0273             
0274             if (NULL!=M_next_fwd_q_list)
0275             {
0276                 M_next_fwd_q_cur = M_next_fwd_q_list;
0277             }
0278         }
0279         
0280         /*
0281          * get the message
0282          */
0283         while (NULL!=M_next_fwd_q_cur)
0284         {
0285             fwd_stats_t *p_stats;
0286             fwd_qlist_t *q_cur = M_next_fwd_q_cur;
0287             
0288             int busy = tmq_fwd_busy_cnt(M_next_fwd_q_cur->qname, &p_stats);
0289             
0290             if (EXFAIL==busy)
0291             {
0292                 NDRX_LOG(log_error, "Failed to get stats for [%s] - memory error",
0293                         M_next_fwd_q_cur->qname);
0294                 userlog("Failed to get stats for [%s] - memory error",
0295                         M_next_fwd_q_cur->qname);
0296                 /* memory error! */
0297                 exit(-1);
0298             }
0299             
0300             NDRX_LOG(log_info, "mon: %s %ld/%ld/%d/%d", 
0301                     M_next_fwd_q_cur->qname
0302                     ,M_next_fwd_q_cur->numenq
0303                     ,M_next_fwd_q_cur->numdeq
0304                     ,busy
0305                     , M_next_fwd_q_cur->workers);
0306             
0307             /* no msg... */
0308             ret=NULL;
0309             ret_msg=NULL;
0310             
0311             /* not all busy... */
0312             M_num_busy+=busy;
0313             
0314             if (busy >= M_next_fwd_q_cur->workers)
0315             {
0316                 /* Queue is busy... nothing todo... */
0317                 M_any_busy=EXTRUE;
0318             }
0319             /* OK, so we peek for a message */
0320             else if (NULL==(ret_msg=tmq_msg_dequeue(M_next_fwd_q_cur->qname, 0, EXTRUE, 
0321                     &qerr, msgbuf, sizeof(msgbuf), NULL, NULL)))
0322             {
0323                 NDRX_LOG(log_debug, "Not messages for dequeue qerr=%ld: %s", 
0324                     qerr, msgbuf);
0325             }
0326             else
0327             {
0328                 NDRX_LOG(log_debug, "Dequeued message");
0329                 M_had_msg=EXTRUE;
0330             }
0331             /* schedule next queue ... */
0332             M_next_fwd_q_cur = M_next_fwd_q_cur->next;
0333         
0334             /* done with this loop if having msg.. 
0335              * Prepare return object
0336              */
0337             if (NULL!=ret_msg)
0338             {
0339                 ret = NDRX_FPMALLOC(sizeof(fwd_msg_t), 0);
0340                 
0341                 if (NULL==ret)
0342                 {
0343                     int err = errno;
0344                     NDRX_LOG(log_error, "Failed to malloc %d bytes: %s - termiante", 
0345                             sizeof(fwd_msg_t), strerror(err));
0346                     userlog("Failed to malloc %d bytes: %s - terminate", 
0347                             sizeof(fwd_msg_t), strerror(err));
0348                     exit(-1);
0349                 }
0350                 ret->msg=ret_msg;
0351                 ret->stats=p_stats;
0352                 ret->sync=q_cur->sync;
0353                 seq++;
0354                 
0355                 ret->seq = seq;
0356                 
0357                 /* add to internal order.. */
0358                 if (ret->sync)
0359                 {
0360                     tmq_fwd_sync_add(ret);
0361                 }
0362                 
0363                 break;
0364             }
0365 
0366         }
0367     
0368         /* if any queue had msgs, then re-scan queue list and retry */
0369         if (NULL==ret)
0370         {
0371             /* read again if had message... 
0372              * WARNING !!!! Please take care about ordering, if had message try
0373              * again only when nothing todo and all was busy, then wait on threads.
0374              */
0375             if (M_had_msg)
0376             {
0377                 NDRX_LOG(log_debug, "Had messages in previous run, scan Qs again");
0378                 again = EXTRUE;
0379             }
0380             else if (M_any_busy)
0381             {
0382                 NDRX_LOG(log_debug, "All Qs/threads busy to the limit wait for slot...");
0383                 
0384                 /* wait on pool 
0385                  * What if new msg is added to queue which is not being processed
0386                  * and all the currently processed queues run very slowly?
0387                  * the minimum would be do to again after the wait time.
0388                  * Also probably wait time shall be set to the same amount
0389                  * as the main sleep, not?
0390                  * 
0391                  * Secondly additional speedup would give if unlocked messages
0392                  * valid for dequeue would trigger that wakups...
0393                  * 
0394                  * Also if having force sleep set, then probably we shall go
0395                  * out of all this to main sleeping routine, not?
0396                  */
0397                 
0398                  /* reset this one */
0399                 ndrx_G_fwd_into_poolsleep=EXTRUE;
0400                 
0401                 ndrx_thpool_timedwait_less(G_tmqueue_cfg.fwdthpool, 
0402                             M_num_busy, G_tmqueue_cfg.scan_time*1000, (int *)&ndrx_G_fwd_force_wake);
0403                 
0404                 /* OK... we are back on the track... */
0405                 ndrx_G_fwd_into_poolsleep=EXFALSE;
0406                 ndrx_G_fwd_force_wake=EXFALSE;
0407                 
0408                 again = EXTRUE;
0409             }
0410         }
0411 
0412         /* &&! Force_sleep */
0413     } while (again && !G_forward_req_shutdown && !M_force_sleep);
0414     
0415 out:
0416     return ret;
0417 }
0418 
0419 /** get lock again to be sure that we are not rolled back */
0420 #define RELOCK do {\
0421                     p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, NULL);\
0422                     /* Try to lock-up */\
0423                     if (NULL==p_tl)\
0424                     {\
0425                         NDRX_LOG(log_error, "Fatal error ! Expected to have transaction for [%s]",\
0426                                 tmxid);\
0427                         userlog("Fatal error ! Expected to have transaction for [%s]",\
0428                                 tmxid);\
0429                         EXFAIL_OUT(ret);\
0430                     }\
0431                 } while (0)
0432 
0433 /** Unlock the queue */
0434 #define UNLOCK do {\
0435         tmq_log_unlock(p_tl);\
0436         p_tl=NULL;\
0437     } while (0)
0438 
0439 /* well...! shouldn't we lock the transaction here?
0440  * Or... periodically lock / unlock as timeout may kill the transaction
0441  * in the middle thus msg ptr would become invalid
0442  */
0443 #define GET_TL do {\
0444         /* & lock the entry */\
0445         tmxid = G_atmi_tls->qdisk_tls->filename_base;\
0446         p_tl = tmq_log_get_entry(tmxid, NDRX_LOCK_WAIT_TIME, NULL);\
0447         /* Try to lock-up */\
0448         if (NULL==p_tl)\
0449         {\
0450             NDRX_LOG(log_error, "Fatal error ! Expected to have transaction for [%s]",\
0451                     tmxid);\
0452             userlog("Fatal error ! Expected to have transaction for [%s]",\
0453                     tmxid);\
0454             EXFAIL_OUT(ret);\
0455         }\
0456     } while (0)
0457 
0458 /**
0459  * Write delete block
0460  */    
0461 #define WRITE_DEL do {\
0462     /* start our internal transaction */\
0463         cmd_block.hdr.command_code = TMQ_STORCMD_DEL;\
0464         /* well this will generate this will add msg to transaction\
0465          * will be handled by timeout setting...\
0466          * No more unlock manual.\
0467          * This will sub-lock\
0468          */\
0469         if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block, \
0470                 "Removing completed message...", NULL, NULL))\
0471         {\
0472             NDRX_LOG(log_error, "Failed to issue complete/remove command to xa for msgid_str [%s]", \
0473                     msgid_str);\
0474             userlog("Failed to issue complete/remove command to xa for msgid_str [%s]", \
0475                     msgid_str);\
0476             /* unlock the msg, as adding to log is last step, \
0477              * thus not in log and we are in control\
0478              */\
0479             tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);\
0480             EXFAIL_OUT(ret);\
0481         }\
0482     } while (0)
0483 /**
0484  * Process of the message
0485  * @param ptr
0486  * @param p_finish_off
0487  */
0488 expublic void thread_process_forward (void *ptr, int *p_finish_off)
0489 {
0490     int ret = EXSUCCEED;
0491     fwd_msg_t * fwd = (fwd_msg_t *)ptr;
0492     tmq_msg_t * msg = fwd->msg;
0493     tmq_qconfig_t qconf;
0494     char *call_buf = NULL;
0495     long call_len = 0;
0496     
0497     char *rply_buf = NULL;
0498     long rply_len = 0;
0499     
0500     typed_buffer_descr_t *descr;
0501     char msgid_str[TMMSGIDLEN_STR+1];
0502     char *fn = "thread_process_forward";
0503     int tperr;
0504     union tmq_block cmd_block;
0505     int tout, tout_autotran;
0506     int sent_ok=EXFALSE;
0507     char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0508     char qname[TMQNAMELEN+1];
0509     char *tmxid=NULL;
0510     int cd;
0511     int msg_released = EXFALSE;
0512     qtran_log_t *p_tl=NULL;
0513     int autotran=EXFALSE;
0514     unsigned long trantime=0;
0515     
0516     if (!M_is_xa_open)
0517     {
0518         if (EXSUCCEED!=tpopen()) /* init the lib anyway... */
0519         {
0520             NDRX_LOG(log_error, "Failed to tpopen() by worker thread: %s", 
0521                     tpstrerror(tperrno));
0522             userlog("Failed to tpopen() by worker thread: %s", tpstrerror(tperrno));
0523             
0524             /* nothing todo! */
0525             exit(1);
0526         }
0527         else
0528         {
0529             M_is_xa_open = EXTRUE;
0530         }
0531     }
0532     
0533     /* for later statistics release */
0534     NDRX_STRCPY_SAFE(qname, msg->hdr.qname);
0535     tmq_msgid_serialize(msg->hdr.msgid, msgid_str); 
0536 
0537     NDRX_LOG(log_info, "%s enter for msgid_str: [%s]", fn, msgid_str);
0538     
0539     /* Call the Service & and issue XA commands for update or delete
0540      *  + If message failed, forward to dead queue (if defined).
0541      */
0542     if (EXSUCCEED!=tmq_qconf_get_with_default_static(msg->hdr.qname, &qconf))
0543     {
0544         /* might happen if we reconfigure on the fly. */
0545         NDRX_LOG(log_error, "Failed to get qconf for [%s]", msg->hdr.qname);
0546         tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0547         EXFAIL_OUT(ret);
0548     }
0549     
0550     if (qconf.txtout > EXFAIL)
0551     {
0552         tout_autotran = tout = qconf.txtout;
0553         NDRX_LOG(log_info, "txtout set to %d sec", tout);
0554     }
0555     else
0556     {
0557         tout_autotran = tout = G_tmqueue_cfg.dflt_timeout;
0558         NDRX_LOG(log_info, "txtout defaulted to %d sec", tout);
0559     }
0560     
0561     
0562     /* substitute the special service name  */
0563     if (0==strcmp(qconf.svcnm, TMQ_QUEUE_SERVICE))
0564     {
0565         NDRX_STRCPY_SAFE(svcnm, qconf.qname);
0566     }
0567     else
0568     {
0569         NDRX_STRCPY_SAFE(svcnm, qconf.svcnm);
0570     }
0571     
0572     /* in case if dest service have auto-tran enabled and we do not start the
0573      * transaction, then for service call use that particular setting
0574      */
0575     if (TMQ_AUTOQ_AUTOTX!=qconf.autoq)
0576     {
0577         int ddr_ret = ndrx_ddr_service_get(svcnm, &autotran, &trantime);
0578         
0579         if (EXFAIL==ddr_ret)
0580         {
0581             NDRX_LOG(log_always, "Service info failed [%s]", svcnm);
0582             tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0583             EXFAIL_OUT(ret);
0584         }
0585         else if (autotran && trantime > 0)
0586         {    
0587             /* so this will apply to doing pure IPC calls. */
0588             tout_autotran = trantime;
0589             NDRX_LOG(log_debug, "autoq=y, svc [%s] uses auto-tran tout: %d", 
0590                     svcnm, tout_autotran);
0591         }
0592     }
0593     
0594     /* Alloc the buffer of the message type according to size (use prepare incoming?)
0595      */
0596     
0597     /* TODO: Cleanup any ptrs... alloc'd now and received back
0598      * This means that different buffer shall be received back
0599      * and pointers from both buffers must be cleaned up.!!!!!
0600      * !!!!!!
0601      */
0602     if (EXSUCCEED!=ndrx_mbuf_prepare_incoming(msg->msg,
0603                     msg->len,
0604                     &call_buf,
0605                     &call_len,
0606                     0, 0))
0607     {
0608         NDRX_LOG(log_always, "Failed to allocate buffer");
0609         tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0610         EXFAIL_OUT(ret);
0611     }
0612     
0613     memset(&cmd_block, 0, sizeof(cmd_block));
0614     memcpy(&cmd_block.hdr, &msg->hdr, sizeof(cmd_block.hdr));
0615     
0616     if (TMQ_AUTOQ_AUTOTX==qconf.autoq)
0617     {   
0618         NDRX_LOG(log_debug, "Service invocation shall be performed in "
0619                 "transactional mode...");
0620         
0621         /* XATMI Timeout setting: is generic tout */
0622         assert(EXSUCCEED==tpsblktime(tout, TPBLK_ALL));
0623         
0624         if (EXSUCCEED!=tpbegin(tout, 0))
0625         {
0626             userlog("Failed to start tran: %s", tpstrerror(tperrno));
0627             NDRX_LOG(log_error, "Failed to start tran!");
0628             
0629             /* nothing todo with the msg, unlock... */
0630             tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0631             EXFAIL_OUT(ret);
0632         }
0633         
0634         /* Normally we expect service to complete OK
0635          * but lock the particular transaction (first one) 
0636          * as msg non-unlockable
0637          */
0638         WRITE_DEL;
0639         GET_TL;
0640         /* mark transaction as not unlockable. */
0641         p_tl->cmds->no_unlock=EXTRUE;
0642         UNLOCK;
0643     }
0644     else
0645     {
0646         /* XATMI timeout setting: tout_autotran */
0647         assert(EXSUCCEED==tpsblktime(tout_autotran, TPBLK_ALL));
0648     }
0649     
0650     /* call the service */
0651     
0652     /* after acall remove our entry
0653      * if after remove all is empty.... do we need to signal? I guess no
0654      * if there is something, then lock & signal.
0655      */
0656     if (fwd->sync)
0657     {
0658         tmq_fwd_sync_wait(fwd);
0659     }
0660     
0661     NDRX_LOG(log_info, "Sending request to service: [%s] sync_seq=%lu", svcnm, fwd->seq);
0662     
0663     cd = tpacall (svcnm, call_buf, call_len, 0);
0664     
0665     /* release the msg... if acall sync */
0666     if (TMQ_SYNC_TPACALL==fwd->sync)
0667     {
0668         tmq_fwd_sync_notify(fwd);
0669         NDRX_LOG(log_debug, "Sync notified (tpacall) sync_seq=%lu", fwd->seq);
0670         msg_released = EXTRUE;
0671     }
0672     
0673     /* get the reply.. */
0674     if (EXFAIL==cd || EXFAIL==tpgetrply (&cd, (char **)&rply_buf, &rply_len, 0))
0675     {
0676         tperr = tperrno;
0677         NDRX_LOG(log_error, "%s failed: %s", svcnm, tpstrerror(tperr));
0678         
0679         /* Bug #421 if called in transaction, then abort current one
0680          * because need to increment the counters in new transaction
0681      * NOTE! Message is not released / unlocked due to marking.
0682          */
0683         if (tpgetlev())
0684         {
0685             NDRX_LOG(log_error, "Abort current transaction for counter increment");
0686             tpabort(0L);
0687         }
0688     }
0689     else
0690     {
0691         sent_ok=EXTRUE;
0692     }
0693     
0694     NDRX_LOG(log_info, "Service answer %s for %s", (sent_ok?"ok":"fail"), msgid_str);
0695         
0696     /* XATMI Timeout setting: is generic tout 
0697      * as previously was used target service timings.
0698      */
0699     if (tout!=tout_autotran)
0700     {
0701         assert(EXSUCCEED==tpsblktime(tout, TPBLK_ALL));
0702     }
0703     
0704     /* start the transaction 
0705      * Note! message is not yet added to transaction with
0706      */
0707     if (!tpgetlev())
0708     {
0709         if (EXSUCCEED!=tpbegin(tout, 0))
0710         {
0711             userlog("Failed to start tran: %s", tpstrerror(tperrno));
0712             NDRX_LOG(log_error, "Failed to start tran!");
0713             tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0714             EXFAIL_OUT(ret);
0715         }
0716     }
0717         
0718     /* 
0719      * just unlock the message. Increase the counter
0720      */
0721     if (sent_ok)
0722     {
0723         /* new tran was started as not autoq */
0724 
0725         if (TMQ_AUTOQ_AUTOTX==qconf.autoq)
0726         {
0727             /* now we can unlock by transaction  */
0728             GET_TL;
0729             p_tl->cmds->no_unlock=EXFALSE;
0730         }
0731         else
0732         {
0733             WRITE_DEL;
0734             GET_TL;
0735         }
0736     
0737         tmq_update_q_stats(msg->hdr.qname, 1, 0);
0738         
0739        /* Remove the message */
0740         if (msg->qctl.flags & TPQREPLYQ)
0741         {
0742             TPQCTL ctl;
0743         
0744             NDRX_LOG(log_warn, "TPQREPLYQ defined, sending answer buffer to "
0745                     "[%s] q in [%s] namespace", 
0746                     msg->qctl.replyqueue, msg->hdr.qspace);
0747             
0748             /* Send response to reply Q (load the data in FB with call details) */
0749             memset(&ctl, 0, sizeof(ctl));
0750                     
0751             /* this will add msg to our transaction, if all ok
0752              * now futher we do not control
0753              * the transaction here, just let let timeout handler to do the
0754              * job
0755              */
0756             
0757             /* we must release the lock here... */
0758             UNLOCK;
0759             ret = tpenqueue (msg->hdr.qspace, msg->qctl.replyqueue, &ctl, rply_buf, rply_len, 0);
0760             RELOCK;
0761             
0762             if (EXSUCCEED!=ret)
0763             {    
0764                 if (TPEDIAGNOSTIC==tperrno)
0765                 {
0766                     NDRX_LOG(log_error, "Failed to enqueue to replyqueue [%s]: %s diag: %d:%s", 
0767                             msg->qctl.replyqueue, tpstrerror(tperrno),
0768                             msg->qctl.diagnostic, msg->qctl.diagmsg);
0769                     userlog("Failed to enqueue to replyqueue [%s]: %s diag: %d:%s", 
0770                             msg->qctl.replyqueue, tpstrerror(tperrno),
0771                             msg->qctl.diagnostic, msg->qctl.diagmsg);
0772                 }
0773                 else
0774                 {
0775                     NDRX_LOG(log_error, "Failed to enqueue to replyqueue [%s]: %s", 
0776                             msg->qctl.replyqueue, tpstrerror(tperrno));
0777                     userlog("Failed to enqueue to replyqueue [%s]: %s", 
0778                             msg->qctl.replyqueue, tpstrerror(tperrno));
0779                 }
0780                 /* no unlock & sleep as we do not know where the transaction
0781                  * did stuck
0782                  */
0783                 EXFAIL_OUT(ret);
0784             }
0785         }
0786         
0787     }
0788     else
0789     {
0790         /* Increase the counter */
0791         msg->trycounter++;
0792         NDRX_LOG(log_warn, "Message [%s] tries %ld, max: %d", 
0793                 msgid_str, msg->trycounter, qconf.tries);
0794         ndrx_utc_tstamp2(&msg->trytstamp, &msg->trytstamp_usec);
0795         
0796         if (msg->trycounter>=qconf.tries)
0797         {
0798             NDRX_LOG(log_error, "Message [%s] expired", msgid_str);
0799             
0800             /* test before eqn... */
0801             tmq_update_q_stats(msg->hdr.qname, 0, 1);
0802             cmd_block.hdr.command_code = TMQ_STORCMD_DEL;
0803             if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block, 
0804                     "Removing expired message...", NULL, NULL))
0805             {
0806                 NDRX_LOG(log_error, "Failed to issue complete/remove command to xa for msgid_str [%s]", 
0807                         msgid_str);
0808                 userlog("Failed to issue complete/remove command to xa for msgid_str [%s]", 
0809                         msgid_str);
0810                 
0811                 /* unlock the msg, as adding to log is last step, 
0812                  * thus not in log and we are in control
0813                  */
0814                 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0815                 EXFAIL_OUT(ret);
0816             }
0817             
0818             /* dynamic mode will promote the tmxid */
0819             GET_TL;
0820             
0821             if (msg->qctl.flags & TPQFAILUREQ && NULL!=rply_buf)
0822             {
0823                 TPQCTL ctl;
0824                 NDRX_LOG(log_warn, "TPQFAILUREQ defined and non NULL reply, enqueue answer buffer to "
0825                     "[%s] q in [%s] namespace", 
0826                     msg->qctl.failurequeue, msg->hdr.qspace);
0827                 
0828 
0829                 /* Send response to reply Q (load the data in FB with call details)
0830                  * Keep the original flags... */
0831                 memcpy(&ctl, &msg->qctl, sizeof(ctl));
0832 
0833                 /* if local tran expires, the process will be unable to join
0834                  * transaction 
0835                  */
0836                 UNLOCK;
0837                 ret = tpenqueue (msg->hdr.qspace, msg->qctl.failurequeue, &ctl, rply_buf, rply_len, 0);
0838                 RELOCK;
0839                 
0840                 if (EXSUCCEED!=ret)
0841                 {
0842                     if (TPEDIAGNOSTIC==tperrno)
0843                     {
0844                         NDRX_LOG(log_error, "Failed to enqueue to failurequeue [%s]: %s diag: %d:%s", 
0845                                 msg->qctl.replyqueue, tpstrerror(tperrno),
0846                                 msg->qctl.diagnostic, msg->qctl.diagmsg);
0847                         userlog("Failed to enqueue to failurequeue [%s]: %s diag: %d:%s", 
0848                                 msg->qctl.replyqueue, tpstrerror(tperrno),
0849                                 msg->qctl.diagnostic, msg->qctl.diagmsg);
0850                     }
0851                     else
0852                     {
0853                         NDRX_LOG(log_error, "Failed to enqueue to failurequeue [%s]: %s", 
0854                                 msg->qctl.replyqueue, tpstrerror(tperrno));
0855                         userlog("Failed to enqueue to failurequeue [%s]: %s", 
0856                                 msg->qctl.replyqueue, tpstrerror(tperrno));
0857                     }
0858                     
0859                     /* let timeout/tmsrv to housekeep...
0860                      * only here if global transaction timed-out
0861                      * then will we force the sleep. Thought this would be
0862                      * very rare case.
0863                      */
0864                     EXFAIL_OUT(ret);
0865                 }
0866             }
0867             
0868             if (EXEOS!=qconf.errorq[0])
0869             {
0870                 TPQCTL ctl;
0871                 NDRX_LOG(log_warn, "ERRORQ defined, enqueue request buffer to "
0872                     "[%s] q in [%s] namespace", qconf.errorq, msg->hdr.qspace);
0873                 
0874 
0875                 /* Send org msg to error queue.
0876                  * Keep the original flags... */
0877                 memcpy(&ctl, &msg->qctl, sizeof(ctl));
0878 
0879                 UNLOCK;
0880                 ret = tpenqueue (msg->hdr.qspace, qconf.errorq, &ctl, call_buf, call_len, 0);
0881                 RELOCK;
0882                 
0883                 if (EXSUCCEED!=ret)
0884                 {
0885                     NDRX_LOG(log_error, "Failed to enqueue to errorq [%s]: %s", 
0886                             qconf.errorq, tpstrerror(tperrno));
0887                     userlog("Failed to enqueue to errorq [%s]: %s", 
0888                             qconf.errorq, tpstrerror(tperrno));
0889                     
0890                     /* let timeout/tmsrv to housekeep... */
0891                     EXFAIL_OUT(ret);
0892                 }
0893             }
0894         }
0895         else
0896         {
0897             /* schedule next run of the msg */
0898             msg->qctl.flags |= TPQTIME_ABS;
0899             if ( 0 == msg->trycounter )
0900             {
0901                 msg->qctl.deq_time = time(NULL) + qconf.waitinit;
0902             }
0903             else 
0904             {
0905                 int retry_inc = qconf.waitretry * msg->trycounter;
0906                 if ( retry_inc > qconf.waitretrymax )
0907                 {
0908                     retry_inc = qconf.waitretrymax;
0909                 }
0910                 msg->qctl.deq_time = time(NULL) + retry_inc;
0911             }
0912 
0913             /* We need to update the message */
0914             UPD_MSG((&cmd_block.upd), msg);
0915         
0916             cmd_block.hdr.command_code = TMQ_STORCMD_UPD;
0917             
0918             if (EXSUCCEED!=tmq_storage_write_cmd_block((char *)&cmd_block, 
0919                     "Update message command", NULL, NULL))
0920             {
0921                 NDRX_LOG(log_error, "Failed to issue update command to xa for msgid_str [%s]", 
0922                         msgid_str);
0923                 userlog("Failed to issue update command to xa for msgid_str [%s]", 
0924                         msgid_str);
0925                 
0926                 /* unlock the msg, as adding to log is last step, 
0927                  * thus not in log and we are in control
0928                  */
0929                 tmq_unlock_msg_by_msgid(msg->hdr.msgid, 0);
0930                 EXFAIL_OUT(ret);
0931             }
0932             
0933             /* dynamic mode will promote the tmxid */
0934             GET_TL;
0935             
0936         }
0937     }
0938 
0939 out:    
0940         
0941     /* release the lock so that commit can complete... */        
0942     if (NULL!=p_tl)
0943     {
0944         tmq_log_unlock(p_tl);
0945     }
0946 
0947     /* NOTE! Cannot touch msg here anymore !?
0948      * what happens to msg object, if timeout rolls-back?
0949      */
0950     if (tpgetlev())
0951     {
0952         if (EXSUCCEED==ret)
0953         {
0954             if (EXSUCCEED!=tpcommit(0L))
0955             {
0956                 NDRX_LOG(log_error, "Failed to commit => aborting + force sleep");
0957                 userlog("Failed to commit => aborting + force sleep");
0958                 M_force_sleep=EXTRUE;
0959                 tpabort(0L);
0960             }
0961         }
0962         else
0963         {
0964             NDRX_LOG(log_error, "System failure during msg processing => aborting + force sleep");
0965             userlog("System failure during msg processing => aborting + force sleep");
0966             tpabort(0L);
0967             M_force_sleep=EXTRUE;
0968         }
0969     }
0970     else if (EXSUCCEED!=ret)
0971     {
0972         NDRX_LOG(log_error, "System failure => force sleep");
0973         M_force_sleep=EXTRUE;
0974     }
0975 
0976     /* let next msg to process... */
0977     if (fwd->sync && !msg_released)
0978     {
0979         tmq_fwd_sync_notify(fwd);
0980         NDRX_LOG(log_debug, "Sync notified (tpcommit) sync_seq=%lu", fwd->seq);
0981     }
0982     
0983     if (NULL!=call_buf)
0984     {
0985         tpfree(call_buf);
0986     }
0987 
0988     if (NULL!=rply_buf)
0989     {
0990         tpfree(rply_buf);
0991     }
0992     
0993     /* release stats counter... */
0994     tmq_fwd_busy_dec(fwd->stats);
0995     
0996     NDRX_FPFREE(fwd);
0997     
0998     /* disable timeouts */
0999     assert(EXSUCCEED==tpsblktime(0, TPBLK_ALL));
1000     
1001     return;
1002 }
1003 
1004 /**
1005  * Continues transaction background loop..
1006  * Try to complete the transactions.
1007  * @return  SUCCEED/FAIL
1008  */
1009 expublic int forward_loop(void)
1010 {
1011     int ret = EXSUCCEED;
1012     int normal_sleep;
1013     fwd_msg_t * fwd_msg;
1014     /*
1015      * We need to get the list of queues to monitor.
1016      * Note that list can be dynamic. So at some periods we need to refresh
1017      * the lists we monitor.
1018      */
1019     while(!G_forward_req_shutdown)
1020     {
1021         fwd_msg = NULL;
1022         
1023         /* wait for one slot to become free.. */
1024         ndrx_thpool_wait_one(G_tmqueue_cfg.fwdthpool);
1025         
1026         normal_sleep=EXFALSE;
1027         if (!M_force_sleep)
1028         {
1029             /* 2. get the message from Q */
1030             fwd_msg = get_next_msg();
1031         
1032             /* 3. run off the thread */
1033             if (NULL!=fwd_msg)
1034             {
1035                 /* Submit the job to thread */
1036                 tmq_fwd_busy_inc(fwd_msg->stats);
1037                 ndrx_thpool_add_work(G_tmqueue_cfg.fwdthpool, (void*)thread_process_forward, (void *)fwd_msg);            
1038             }
1039             else
1040             {
1041                 normal_sleep=EXTRUE;
1042             }
1043         }
1044 
1045         /* go sleep if no msgs, or forced */
1046         if (normal_sleep || M_force_sleep)
1047         {
1048             /* sleep only when did not have a message 
1049              * So that if we have batch, we try to use all resources...
1050              */
1051             NDRX_LOG(log_debug, "background - sleep %d forced=%d", 
1052                     G_tmqueue_cfg.scan_time, M_force_sleep);
1053             
1054             if (!M_force_sleep)
1055             {
1056                 ndrx_G_fwd_into_sleep=EXTRUE;
1057             }
1058             
1059             thread_sleep(G_tmqueue_cfg.scan_time);
1060             
1061             /* in case of error, forced sleep */
1062             M_force_sleep=EXFALSE;
1063             ndrx_G_fwd_into_sleep=EXFALSE;
1064             /* reset this one */
1065             ndrx_G_fwd_force_wake=EXFALSE;
1066         }
1067     }
1068     
1069     /* ask the shutodwn */
1070     ndrx_G_forward_req_shutdown_ack = EXTRUE;
1071     
1072     /* remove any allocated memory... */
1073     fwd_q_list_rm();
1074     
1075 out:
1076     return ret;
1077 }
1078 
1079 /**
1080  * Background processing of the transactions (Complete them).
1081  * @return 
1082  */
1083 expublic void * forward_process(void *arg)
1084 {
1085     NDRX_LOG(log_error, "***********BACKGROUND PROCESS START ********");
1086     
1087     tmq_thread_init();
1088     forward_loop();
1089     tmq_thread_uninit();
1090     
1091     NDRX_LOG(log_error, "***********BACKGROUND PROCESS END **********");
1092     
1093     return NULL;
1094 }
1095 
1096 /**
1097  * Initialize background process
1098  * @return EXSUCCEED/EXFAIL
1099  */
1100 expublic int forward_process_init(void)
1101 {
1102     int ret = EXSUCCEED;
1103     
1104     pthread_attr_t pthread_custom_attr;
1105     pthread_attr_init(&pthread_custom_attr);
1106     
1107     /* set some small stacks size, 1M should be fine! */
1108     ndrx_platf_stack_set(&pthread_custom_attr);
1109     if (EXSUCCEED!=pthread_create(&G_forward_thread, &pthread_custom_attr, 
1110             forward_process, NULL))
1111     {
1112         NDRX_PLATF_DIAG(NDRX_DIAG_PTHREAD_CREATE, errno, "forward_process_init");
1113         EXFAIL_OUT(ret);
1114     }
1115 out:
1116     return ret;
1117 }
1118 /* vim: set ts=4 sw=4 et smartindent: */