Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Temporary queue runner
0003  *
0004  * @file temq.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 #include <poll.h>
0041 
0042 #include <ndebug.h>
0043 #include <atmi.h>
0044 #include <atmi_int.h>
0045 #include <typed_buf.h>
0046 #include <ndrstandard.h>
0047 #include <ubf.h>
0048 #include <Exfields.h>
0049 #include <gencall.h>
0050 
0051 #include <exnet.h>
0052 #include <ndrxdcmn.h>
0053 
0054 #include "bridge.h"
0055 #include "../libatmisrv/srv_int.h"
0056 #include "exsha1.h"
0057 #include <ndrxdiag.h>
0058 /*---------------------------Externs------------------------------------*/
0059 /*---------------------------Macros-------------------------------------*/
0060 
0061 /**
0062  * Remove message from hash queue
0063  * and delete queue hash if there arn't any msgs in
0064  */
0065 #define RM_MSG(QHASH)  do {\
0066                     /* locking here needed.. */\
0067                     MUTEX_LOCK_V(M_in_q_lock);\
0068                     /* remove from Q - ok */\
0069                     DL_DELETE(QHASH->msgs, el);\
0070                     NDRX_FPFREE(el->buffer);\
0071                     NDRX_FPFREE(el);\
0072                     M_msgs_in_q--;\
0073                     QHASH->nrmsg--;\
0074                     if (0==QHASH->nrmsg) {EXHASH_DEL(M_qstr_hash, QHASH); NDRX_FPFREE(QHASH);}\
0075                     MUTEX_UNLOCK_V(M_in_q_lock);\
0076                 } while (0);
0077                 
0078 #define DISCARD_CALL_LOG    do {NDRX_LOG(log_error, \
0079                     "Discarding svc call! call age = %ld s, client timeout = %d "\
0080                     "cd: %d timestamp: %d (id: %d%d) callseq: %u, "\
0081                     "svc: %s, flags: %ld, call age: %ld, data_len: %ld, caller: %s "\
0082                     " reply_to: %s, call_stack: %s",\
0083                     call_age, call->clttout,\
0084                     call->cd, call->timestamp, call->cd, call->timestamp, call->callseq, \
0085                     call->name, call->flags, call_age, call->data_len,\
0086                     call->my_id, call->reply_to, call->callstack);} while (0)
0087 
0088 /*---------------------------Enums--------------------------------------*/
0089 /*---------------------------Typedefs-----------------------------------*/
0090 /*---------------------------Globals------------------------------------*/
0091 /*---------------------------Statics------------------------------------*/
0092 
0093 exprivate MUTEX_LOCKDECL(M_in_q_lock);  /**< Queue lock the queue cache       */
0094 exprivate int M_msgs_in_q = 0;          /**< Number of messages enqueued      */
0095 exprivate in_msg_hash_t *M_qstr_hash = NULL; /**< Hash of temp queues         */
0096 exprivate int M_stopped = EXFALSE;      /**< Is incoming stopped              */
0097 MUTEX_LOCKDECL(ndrx_G_global_br_lock);  /**< Global bridge lock               */
0098 
0099 /**< Some job is enqueued, check the counts & incoming lockups                */
0100 exprivate pthread_cond_t  M_wakup_queue_runner; 
0101 
0102 
0103 /*---------------------------Prototypes---------------------------------*/
0104 
0105 /**
0106  * init the temp queue runner.
0107  */
0108 expublic void br_tempq_init(void)
0109 {
0110     pthread_cond_init(&M_wakup_queue_runner, NULL);
0111 }
0112 
0113 /**
0114  * Generate error to network if required.
0115  * We detect the packet type here.
0116  * @param buf
0117  * @param len
0118  * @param pack_type
0119  * @return 
0120  */
0121 exprivate int br_generate_error_to_net(char *buf, int len, int pack_type, long rcode)
0122 {
0123     int ret=EXSUCCEED;
0124     
0125     switch(pack_type)
0126     {
0127         case PACK_TYPE_TONDRXD:
0128             /* Will not generate any error here*/
0129             break;
0130         case PACK_TYPE_TOSVC:
0131             /* So This was TPCALL, we might want to send error reply back. */
0132         {
0133             tp_command_call_t *call = (tp_command_call_t *)buf;
0134             
0135             if (!(call->flags & TPNOREPLY))
0136             {
0137                 NDRX_LOG(log_warn, "Sending back error reply");
0138                 reply_with_failure(TPNOBLOCK, call, NULL, NULL, rcode);
0139             }
0140         }
0141             break;
0142         case PACK_TYPE_TORPLYQ:
0143             /* Will not generate any error here*/
0144             break;
0145         default:
0146             NDRX_LOG(log_warn, "Nothing to do for pack_type=%d", 
0147                     pack_type);
0148             break;
0149     }
0150     
0151 out:
0152     return ret;
0153 }
0154 
0155 /**
0156  * So we got q send error.
0157  * @param destqstr dest queue where message sent attempt was made
0158  * @param qhash for queue runner hash of each destination
0159  *  in case of NULL, no enqueue required (failed to resolve service q, nothing todo)
0160  */
0161 expublic int br_process_error(char *buf, int len, int err, 
0162         in_msg_t* el, int pack_type, char *destqstr, in_msg_hash_t * qhash)
0163 {
0164     long rcode = TPESVCERR;
0165     
0166     if (err==ENOENT)
0167     {
0168         rcode = TPENOENT;
0169     }
0170     
0171     /* So this is initial call */
0172     if (NULL==el && NULL!=destqstr)
0173     {
0174         /* This will be processed by queue runner */
0175         if (EAGAIN==err || EINTR==err)
0176         {
0177             /* Put stuff in queue */
0178             br_add_to_q(buf, len, pack_type, destqstr);
0179         }
0180         else
0181         {
0182             /* TODO: Ignore the error or send error back - TPNOENT probably (if this is request) */
0183             br_generate_error_to_net(buf, len, pack_type, rcode);
0184         }
0185     }
0186     else
0187     {
0188         NDRX_LOG(log_debug, "Got error processing from Q");
0189         
0190         /* In this case we should handle the stuff!?!?
0191          * Generate reply (only if needed), Not sure TPNOTENT or svcerr?
0192          * SVCERR could be better because initially we thought that service exists.
0193          */
0194         br_generate_error_to_net(buf, len, pack_type, rcode);
0195         
0196         if (PACK_TYPE_TOSVC==pack_type)
0197         {
0198             tp_command_call_t *call = (tp_command_call_t *)buf;
0199             long call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0200             /* print some add info */
0201             
0202             DISCARD_CALL_LOG;
0203         }
0204         
0205         NDRX_DUMP(log_warn, "Discarding message", buf, len);
0206         
0207         userlog("Discarding message: %p/%d dest q: target q: [%s]", buf, len, 
0208                 destqstr?destqstr:"(null)");
0209         
0210         /* just in case if it is first attempt and service does not exists in shm... 
0211          * discard immediately and thus el does not exist
0212          */
0213         if (NULL!=el && NULL!=qhash)
0214         {
0215             RM_MSG(qhash);
0216         }
0217     }
0218     
0219     return EXSUCCEED;
0220 }
0221 
0222 /**
0223  * Check the global processing limit
0224  * @return EXSUCCEED (ok), EXFAIL (lock time limit)
0225  */
0226 expublic int br_chk_limit(void)
0227 {
0228 #ifdef EX_OS_DARWIN
0229     
0230     MUTEX_LOCK_V(ndrx_G_global_br_lock);
0231     MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0232     return EXSUCCEED;
0233     
0234 #else
0235     int ret;
0236     
0237     struct timespec wait_time;
0238     struct timeval now;
0239 
0240     gettimeofday(&now,NULL);
0241 
0242     wait_time.tv_sec = now.tv_sec+1;
0243     wait_time.tv_nsec = now.tv_usec;
0244 
0245     ret=pthread_mutex_timedlock(&ndrx_G_global_br_lock, &wait_time);
0246     
0247     if (EXSUCCEED==ret)
0248     {
0249         MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0250     }
0251     else
0252     {
0253         NDRX_LOG(log_error, "Global lock timed out: %s", strerror(ret));
0254     }
0255     return ret;
0256 #endif
0257 }
0258 
0259 /**
0260  * Run queue from thread.
0261  * This is started from main thread periodic runner.
0262  * The special flag is used to indicate if run job was in queue
0263  * If incoming limit of queue reached, we shall lock the mutex and do not
0264  * allow any incoming traffic.
0265  * 
0266  * @param ptr not used
0267  * @param p_finish_off not used
0268  * @return EXSUCCEED;
0269  */
0270 exprivate int br_run_q_th(void *ptr, int *p_finish_off)
0271 {
0272     int ret = EXSUCCEED;
0273     in_msg_t *el, *elt;
0274     long call_age;
0275     long time_in_q, spent_from_last_upd;
0276     long sleep_time;
0277     in_msg_hash_t *qhash, *qhashtmp;
0278     int msg_deleted;
0279     int cur_was_ok;
0280 
0281     /* wait for conditional... so that we get quick wakeups
0282     * in case if sleeping for long and some msgs is being added...
0283     */
0284     int cret;
0285     struct timespec wait_time;
0286     struct timeval now;
0287     
0288 #define NEVER_SLEEP (G_bridge_cfg.qmaxsleep+1)
0289     /**
0290      * Possible dead lock if service puts back in queue/ 
0291      * do the unlock in the middle to allow adding msg?
0292      * 
0293      * If delete is allowed only from this thread, then we should sync only
0294      * on adding..
0295      * 
0296      */
0297     
0298     /* Master loop of queues... */
0299     
0300     /* loop runs in locked mode.. */
0301     sleep_time=NEVER_SLEEP;
0302         
0303     NDRX_LOG(log_debug, "br_run_q_th enter");
0304     
0305     MUTEX_LOCK_V(M_in_q_lock);
0306     EXHASH_ITER(hh, M_qstr_hash, qhash, qhashtmp)
0307     {
0308         NDRX_LOG(log_debug, "Checking queue: [%s]", qhash->qstr);
0309         cur_was_ok=EXFALSE;
0310         /* process until we get the error... */
0311         DL_FOREACH_SAFE(qhash->msgs, el, elt)
0312         {
0313             MUTEX_UNLOCK_V(M_in_q_lock);
0314             
0315             /* check is the queue turn right now? */
0316             spent_from_last_upd = ndrx_stopwatch_get_delta(&(el->updatetime));
0317             
0318             /* if time spent is lesser then scheduled next run, then try next 
0319              * anyway calc new sleep time..
0320              * Run only if this is first msg... if was attempt OK 
0321              */
0322             if (spent_from_last_upd<el->next_try_ms && !cur_was_ok)
0323             {
0324                 long time_left = el->next_try_ms-spent_from_last_upd;
0325                 
0326                 NDRX_LOG(log_debug, "Time left for %s is: %ld", 
0327                         el->destqstr, time_left);
0328                         
0329                 if (time_left < sleep_time)
0330                 {
0331                     sleep_time = time_left;
0332                 }
0333                 
0334                 /* WARNING ! break to next queue... */
0335                 MUTEX_LOCK_V(M_in_q_lock);
0336                 break;
0337             }
0338 
0339             msg_deleted=EXFALSE;
0340             time_in_q = ndrx_stopwatch_get_delta(&(el->addedtime));
0341             el->tries++;
0342             NDRX_LOG(log_warn, "Processing late delivery of %p/%d [%s] "
0343                     "try %d/%d nrmsg: %d time_in_q %ld ms (ttl: %d) next_try_ms %ld ms", 
0344                     el->buffer, el->len, el->destqstr, el->tries, 
0345                     G_bridge_cfg.qretries, qhash->nrmsg, time_in_q, G_bridge_cfg.qttl,
0346                     el->next_try_ms);
0347 
0348             /* if ttl is used, then check time-out too */
0349             if (el->tries <= G_bridge_cfg.qretries && time_in_q<=G_bridge_cfg.qttl)
0350             {
0351                 if (EXSUCCEED!=(ret=ndrx_generic_q_send(el->destqstr, (char *)el->buffer, 
0352                         el->len, TPNOBLOCK, 0)))
0353                 {
0354 
0355                     NDRX_LOG(log_error, "Failed to send message to [%s]: %s",
0356                             el->destqstr, tpstrerror(ret));
0357 
0358                     /* analyze the error - if queue is missing
0359                      * then drop the message -> set the max tries...
0360                      * or do the retry if queue is full, otherwise we drop the msg.
0361                      */
0362 
0363                     if (EAGAIN!=ret && EINTR!=ret)
0364                     {
0365                        NDRX_LOG(log_error, "Dest queue is broken");
0366 
0367                        br_process_error((char *)el->buffer, el->len, EXFAIL, 
0368                                 el, el->pack_type, el->destqstr, qhash);
0369                        msg_deleted=EXTRUE;
0370                     }
0371                     /* if it is svc reply, check the timeout flag too.. */
0372                     else if (PACK_TYPE_TOSVC==el->pack_type)
0373                     {
0374                         tp_command_call_t *call = (tp_command_call_t *)el->buffer;
0375                         call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0376                         /**
0377                          * If possible check the expiry of the call,
0378                          * if so, drop the msg
0379                          */
0380                         if ((ATMI_COMMAND_TPCALL==call->command_id || 
0381                             ATMI_COMMAND_CONNECT==call->command_id) &&
0382                                 call->clttout > 0 && call_age >= call->clttout && 
0383                                 !(call->flags & TPNOTIME))
0384                         {
0385                             /* no need to reply? */
0386                             NDRX_LOG(log_error, "Message expired - remove / no reply");
0387                             DISCARD_CALL_LOG;
0388                             RM_MSG(qhash);
0389                             msg_deleted=EXTRUE;
0390                         }
0391                     }
0392 
0393                 }
0394                 else
0395                 {
0396                     /* sent ok */
0397                     RM_MSG(qhash);
0398                     msg_deleted=EXTRUE;
0399                 }
0400             }
0401             else
0402             {
0403                 br_process_error((char *)el->buffer, 
0404                         el->len, EXFAIL, el, el->pack_type, el->destqstr, qhash);
0405                 msg_deleted=EXTRUE;
0406             }
0407 
0408             /* if last msg, was not removed, then schedule new run 
0409              * and try next queue...
0410              */
0411             
0412             if (!msg_deleted)
0413             {
0414                 /* multiple sleep time by 2 */
0415                 el->next_try_ms*=2;
0416                 
0417                 if (el->next_try_ms>G_bridge_cfg.qmaxsleep)
0418                 {
0419                     el->next_try_ms=G_bridge_cfg.qmaxsleep;
0420                 }
0421                 else if (el->next_try_ms<G_bridge_cfg.qminsleep)
0422                 {
0423                     el->next_try_ms=G_bridge_cfg.qminsleep;
0424                 }
0425                 
0426                 if (el->next_try_ms < sleep_time)
0427                 {
0428                     sleep_time = el->next_try_ms;
0429                 }
0430                 
0431                 /* set the life cycle */
0432                 ndrx_stopwatch_reset(&el->updatetime);
0433                 MUTEX_LOCK_V(M_in_q_lock);
0434                 
0435                 /* process next queue... */
0436                 break;
0437             }
0438             else
0439             {
0440                 cur_was_ok=EXTRUE;
0441             }
0442             
0443             MUTEX_LOCK_V(M_in_q_lock);
0444         }
0445        
0446         /* here we are locked... */
0447     }
0448     
0449     
0450     /* Stop the incoming traffic if total queue is full: */
0451     if (M_msgs_in_q > G_bridge_cfg.qsize && !M_stopped)
0452     {
0453         NDRX_LOG(log_error, "Max number of msgs queued in bridge: %d "
0454                 "currently: %d - stop online traffic", 
0455                 G_bridge_cfg.qsize, M_msgs_in_q);
0456 
0457         /* set global lock */
0458         MUTEX_LOCK_V(ndrx_G_global_br_lock);
0459         M_stopped=EXTRUE;
0460 
0461     }
0462     else if (M_msgs_in_q < G_bridge_cfg.qsize && M_stopped)
0463     {
0464         NDRX_LOG(log_error, "Max number of msgs queued in bridge: %d "
0465                 "currently: %d - resume online traffic", 
0466                 G_bridge_cfg.qsize, M_msgs_in_q);
0467 
0468         /* unset global lock */
0469         MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0470         M_stopped=EXFALSE;
0471     }
0472     
0473     MUTEX_UNLOCK_V(M_in_q_lock);
0474     
0475     /* Do some sleep if required */
0476     if (M_msgs_in_q > 0)
0477     {
0478         if (sleep_time>0)
0479         {
0480 
0481             /* while the M_in_q_lock was unlocked (loop) finished, somene has added msg
0482              * thus use min sleep
0483              */
0484             if (sleep_time > G_bridge_cfg.qmaxsleep)
0485             {
0486                 sleep_time=G_bridge_cfg.qminsleep;
0487             }
0488 
0489             NDRX_LOG(log_info, "Sleep time: %ld ms M_msgs_in_q: %d", 
0490                     sleep_time, M_msgs_in_q);
0491 
0492             /* wouldn't it be better to wait for conditional?
0493              * so that if new msg is enqueued, checks can be performed?
0494              */
0495             MUTEX_LOCK_V(M_in_q_lock);
0496 
0497             gettimeofday(&now, NULL);
0498 
0499             wait_time.tv_sec = now.tv_sec;
0500             /* convert to ms: */
0501             wait_time.tv_nsec = now.tv_usec*1000;
0502 
0503             ndrx_timespec_plus(&wait_time, sleep_time);
0504 
0505             /* sleep or wait event.. */
0506             cret=pthread_cond_timedwait(&M_wakup_queue_runner, &M_in_q_lock, &wait_time);
0507 
0508             NDRX_LOG(log_debug, "pthread_cond_timedwait returns %d: %s", 
0509                     cret, strerror(cret));
0510             
0511             MUTEX_UNLOCK_V(M_in_q_lock);
0512         }
0513 
0514         /* run loop again */
0515         if (EXSUCCEED!=ndrx_thpool_add_work2(G_bridge_cfg.thpool_queue, (void *)br_run_q_th, 
0516                 NULL, NDRX_THPOOL_ONEJOB, 0))
0517         {
0518             NDRX_LOG(log_debug, "Already run queued...");
0519         }
0520     }
0521     
0522 out:
0523     NDRX_LOG(log_info, "Current queue stats: M_msgs_in_q=%d", M_msgs_in_q);
0524     return ret;
0525 }
0526 
0527 /**
0528  * Alloc or find the queue
0529  * @param qstr queue name
0530  * @return NULL in case of failure
0531  */
0532 exprivate in_msg_hash_t* get_qstr_hash(char *qstr)
0533 {
0534     in_msg_hash_t *ret;
0535     int err;
0536     
0537     EXHASH_FIND_STR(M_qstr_hash, qstr, ret);
0538     
0539     if (NULL==ret)
0540     {
0541         /* alloc new... */        
0542         if (NULL==(ret=NDRX_FPMALLOC(sizeof(in_msg_hash_t), 0)))
0543         {
0544             err = errno;
0545             NDRX_LOG(log_error, "Failed to malloc %d bytes: %s", 
0546                     sizeof(in_msg_hash_t), strerror(err));
0547             userlog("Failed to malloc %d bytes: %s", sizeof(in_msg_hash_t),
0548                     strerror(err));
0549             goto out;
0550         }
0551         
0552         ret->nrmsg=0;
0553         ret->msgs=NULL;
0554         NDRX_STRCPY_SAFE(ret->qstr, qstr);
0555         EXHASH_ADD_STR( M_qstr_hash, qstr, ret );
0556         
0557         NDRX_LOG(log_error, "New temporary queue [%s]", qstr);
0558     }
0559 out:
0560     return ret;    
0561 }
0562 
0563 /**
0564  * Enqueue the message for delayed send.
0565  * @return 
0566  */
0567 expublic int br_add_to_q(char *buf, int len, int pack_type, char *destq)
0568 {
0569     int ret=EXSUCCEED;
0570     in_msg_t *msg;
0571     in_msg_hash_t *qhash;
0572     int dropmsg = EXFALSE;
0573     if (NULL==(msg=NDRX_FPMALLOC(sizeof(in_msg_t), 0)))
0574     {
0575         NDRX_ERR_MALLOC(sizeof(in_msg_t));
0576         EXFAIL_OUT(ret);
0577     }
0578     
0579     if (NULL==(msg->buffer=NDRX_FPMALLOC(len, 0)))
0580     {
0581         NDRX_ERR_MALLOC(len);
0582         EXFAIL_OUT(ret);
0583     }
0584     
0585     /*fill in the details*/
0586     msg->pack_type = pack_type;
0587     msg->len = len;
0588     msg->tries=1;
0589     /* have minimum sleep... */
0590     msg->next_try_ms=G_bridge_cfg.qminsleep;
0591     NDRX_STRCPY_SAFE(msg->destqstr, destq);
0592     memcpy(msg->buffer, buf, len);
0593     
0594     ndrx_stopwatch_reset(&msg->addedtime);
0595     ndrx_stopwatch_reset(&msg->updatetime);
0596     
0597     /* Bug #465 moved  after the adding to Q */
0598     NDRX_LOG(log_warn, "About to add %p/%d [%s] to in-mem queue "
0599             "for late delivery...", msg->buffer, msg->len, msg->destqstr);
0600     
0601     /* search for Q def */
0602     MUTEX_LOCK_V(M_in_q_lock);
0603     
0604     /* we added msgs... */
0605     if (NULL==(qhash = get_qstr_hash(destq)))
0606     {
0607         MUTEX_UNLOCK_V(M_in_q_lock);
0608         EXFAIL_OUT(ret);
0609     }
0610     
0611     if (G_bridge_cfg.qfullaction == QUEUE_ACTION_DROP && M_msgs_in_q+1 > G_bridge_cfg.qsize)
0612     {
0613         NDRX_LOG(log_error, "Temporary queue full (max: %d, new size: %d) "
0614                 "and action is to drop",
0615                 G_bridge_cfg.qsize, M_msgs_in_q+1);
0616         dropmsg=EXTRUE;
0617     }
0618     else if (G_bridge_cfg.qfullactionsvc == QUEUE_ACTION_DROP && qhash->nrmsg+1 > G_bridge_cfg.qsizesvc)
0619     {
0620         NDRX_LOG(log_error, "Temporary service queue is full (max: %d, new size: %d) "
0621                 "and action is to drop",
0622                 G_bridge_cfg.qsizesvc, qhash->nrmsg+1);
0623         dropmsg=EXTRUE;
0624     }
0625     else
0626     {
0627         M_msgs_in_q++;
0628         qhash->nrmsg++;
0629 
0630         DL_APPEND(qhash->msgs, msg);
0631         
0632         /* wake up if current queue has just added...
0633          * otherwise the time for hash run is already scheduled
0634          */
0635         if (qhash->nrmsg==1)
0636         {
0637             /* notify that new message has arrived... */
0638             pthread_cond_signal(&M_wakup_queue_runner);
0639         }
0640     }
0641     
0642     MUTEX_UNLOCK_V(M_in_q_lock);
0643     
0644     if (dropmsg)
0645     {
0646         br_process_error(buf, len, EXFAIL, 
0647                                 msg, pack_type, destq, NULL);
0648         NDRX_FPFREE(msg->buffer);
0649         NDRX_FPFREE(msg);
0650     }
0651     else
0652     {
0653         /* issue the queue runner job */
0654         ndrx_thpool_add_work2(G_bridge_cfg.thpool_queue, (void*)br_run_q_th, 
0655                 NULL, NDRX_THPOOL_ONEJOB, 0);
0656     }
0657     
0658 out:
0659 
0660     /* if not checking for ret, the queue sender may be already processed the msg... */
0661     if (EXSUCCEED!=ret)
0662     {
0663         if (NULL!=msg->buffer)
0664         {
0665             NDRX_FPFREE(msg->buffer);
0666         }
0667         
0668         if (NULL!=msg)
0669         {
0670             NDRX_FPFREE(msg);
0671         }
0672     }
0673 
0674     return ret;
0675 }
0676 
0677 /* vim: set ts=4 sw=4 et smartindent: */