0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 #include <poll.h>
0041
0042 #include <ndebug.h>
0043 #include <atmi.h>
0044 #include <atmi_int.h>
0045 #include <typed_buf.h>
0046 #include <ndrstandard.h>
0047 #include <ubf.h>
0048 #include <Exfields.h>
0049 #include <gencall.h>
0050
0051 #include <exnet.h>
0052 #include <ndrxdcmn.h>
0053
0054 #include "bridge.h"
0055 #include "../libatmisrv/srv_int.h"
0056 #include "exsha1.h"
0057 #include <ndrxdiag.h>
0058
0059
0060
0061
0062
0063
0064
0065 #define RM_MSG(QHASH) do {\
0066 \
0067 MUTEX_LOCK_V(M_in_q_lock);\
0068 \
0069 DL_DELETE(QHASH->msgs, el);\
0070 NDRX_FPFREE(el->buffer);\
0071 NDRX_FPFREE(el);\
0072 M_msgs_in_q--;\
0073 QHASH->nrmsg--;\
0074 if (0==QHASH->nrmsg) {EXHASH_DEL(M_qstr_hash, QHASH); NDRX_FPFREE(QHASH);}\
0075 MUTEX_UNLOCK_V(M_in_q_lock);\
0076 } while (0);
0077
0078 #define DISCARD_CALL_LOG do {NDRX_LOG(log_error, \
0079 "Discarding svc call! call age = %ld s, client timeout = %d "\
0080 "cd: %d timestamp: %d (id: %d%d) callseq: %u, "\
0081 "svc: %s, flags: %ld, call age: %ld, data_len: %ld, caller: %s "\
0082 " reply_to: %s, call_stack: %s",\
0083 call_age, call->clttout,\
0084 call->cd, call->timestamp, call->cd, call->timestamp, call->callseq, \
0085 call->name, call->flags, call_age, call->data_len,\
0086 call->my_id, call->reply_to, call->callstack);} while (0)
0087
0088
0089
0090
0091
0092
0093 exprivate MUTEX_LOCKDECL(M_in_q_lock);
0094 exprivate int M_msgs_in_q = 0;
0095 exprivate in_msg_hash_t *M_qstr_hash = NULL;
0096 exprivate int M_stopped = EXFALSE;
0097 MUTEX_LOCKDECL(ndrx_G_global_br_lock);
0098
0099
0100 exprivate pthread_cond_t M_wakup_queue_runner;
0101
0102
0103
0104
0105
0106
0107
0108 expublic void br_tempq_init(void)
0109 {
0110 pthread_cond_init(&M_wakup_queue_runner, NULL);
0111 }
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121 exprivate int br_generate_error_to_net(char *buf, int len, int pack_type, long rcode)
0122 {
0123 int ret=EXSUCCEED;
0124
0125 switch(pack_type)
0126 {
0127 case PACK_TYPE_TONDRXD:
0128
0129 break;
0130 case PACK_TYPE_TOSVC:
0131
0132 {
0133 tp_command_call_t *call = (tp_command_call_t *)buf;
0134
0135 if (!(call->flags & TPNOREPLY))
0136 {
0137 NDRX_LOG(log_warn, "Sending back error reply");
0138 reply_with_failure(TPNOBLOCK, call, NULL, NULL, rcode);
0139 }
0140 }
0141 break;
0142 case PACK_TYPE_TORPLYQ:
0143
0144 break;
0145 default:
0146 NDRX_LOG(log_warn, "Nothing to do for pack_type=%d",
0147 pack_type);
0148 break;
0149 }
0150
0151 out:
0152 return ret;
0153 }
0154
0155
0156
0157
0158
0159
0160
0161 expublic int br_process_error(char *buf, int len, int err,
0162 in_msg_t* el, int pack_type, char *destqstr, in_msg_hash_t * qhash)
0163 {
0164 long rcode = TPESVCERR;
0165
0166 if (err==ENOENT)
0167 {
0168 rcode = TPENOENT;
0169 }
0170
0171
0172 if (NULL==el && NULL!=destqstr)
0173 {
0174
0175 if (EAGAIN==err || EINTR==err)
0176 {
0177
0178 br_add_to_q(buf, len, pack_type, destqstr);
0179 }
0180 else
0181 {
0182
0183 br_generate_error_to_net(buf, len, pack_type, rcode);
0184 }
0185 }
0186 else
0187 {
0188 NDRX_LOG(log_debug, "Got error processing from Q");
0189
0190
0191
0192
0193
0194 br_generate_error_to_net(buf, len, pack_type, rcode);
0195
0196 if (PACK_TYPE_TOSVC==pack_type)
0197 {
0198 tp_command_call_t *call = (tp_command_call_t *)buf;
0199 long call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0200
0201
0202 DISCARD_CALL_LOG;
0203 }
0204
0205 NDRX_DUMP(log_warn, "Discarding message", buf, len);
0206
0207 userlog("Discarding message: %p/%d dest q: target q: [%s]", buf, len,
0208 destqstr?destqstr:"(null)");
0209
0210
0211
0212
0213 if (NULL!=el && NULL!=qhash)
0214 {
0215 RM_MSG(qhash);
0216 }
0217 }
0218
0219 return EXSUCCEED;
0220 }
0221
0222
0223
0224
0225
0226 expublic int br_chk_limit(void)
0227 {
0228 #ifdef EX_OS_DARWIN
0229
0230 MUTEX_LOCK_V(ndrx_G_global_br_lock);
0231 MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0232 return EXSUCCEED;
0233
0234 #else
0235 int ret;
0236
0237 struct timespec wait_time;
0238 struct timeval now;
0239
0240 gettimeofday(&now,NULL);
0241
0242 wait_time.tv_sec = now.tv_sec+1;
0243 wait_time.tv_nsec = now.tv_usec;
0244
0245 ret=pthread_mutex_timedlock(&ndrx_G_global_br_lock, &wait_time);
0246
0247 if (EXSUCCEED==ret)
0248 {
0249 MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0250 }
0251 else
0252 {
0253 NDRX_LOG(log_error, "Global lock timed out: %s", strerror(ret));
0254 }
0255 return ret;
0256 #endif
0257 }
0258
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270 exprivate int br_run_q_th(void *ptr, int *p_finish_off)
0271 {
0272 int ret = EXSUCCEED;
0273 in_msg_t *el, *elt;
0274 long call_age;
0275 long time_in_q, spent_from_last_upd;
0276 long sleep_time;
0277 in_msg_hash_t *qhash, *qhashtmp;
0278 int msg_deleted;
0279 int cur_was_ok;
0280
0281
0282
0283
0284 int cret;
0285 struct timespec wait_time;
0286 struct timeval now;
0287
0288 #define NEVER_SLEEP (G_bridge_cfg.qmaxsleep+1)
0289
0290
0291
0292
0293
0294
0295
0296
0297
0298
0299
0300
0301 sleep_time=NEVER_SLEEP;
0302
0303 NDRX_LOG(log_debug, "br_run_q_th enter");
0304
0305 MUTEX_LOCK_V(M_in_q_lock);
0306 EXHASH_ITER(hh, M_qstr_hash, qhash, qhashtmp)
0307 {
0308 NDRX_LOG(log_debug, "Checking queue: [%s]", qhash->qstr);
0309 cur_was_ok=EXFALSE;
0310
0311 DL_FOREACH_SAFE(qhash->msgs, el, elt)
0312 {
0313 MUTEX_UNLOCK_V(M_in_q_lock);
0314
0315
0316 spent_from_last_upd = ndrx_stopwatch_get_delta(&(el->updatetime));
0317
0318
0319
0320
0321
0322 if (spent_from_last_upd<el->next_try_ms && !cur_was_ok)
0323 {
0324 long time_left = el->next_try_ms-spent_from_last_upd;
0325
0326 NDRX_LOG(log_debug, "Time left for %s is: %ld",
0327 el->destqstr, time_left);
0328
0329 if (time_left < sleep_time)
0330 {
0331 sleep_time = time_left;
0332 }
0333
0334
0335 MUTEX_LOCK_V(M_in_q_lock);
0336 break;
0337 }
0338
0339 msg_deleted=EXFALSE;
0340 time_in_q = ndrx_stopwatch_get_delta(&(el->addedtime));
0341 el->tries++;
0342 NDRX_LOG(log_warn, "Processing late delivery of %p/%d [%s] "
0343 "try %d/%d nrmsg: %d time_in_q %ld ms (ttl: %d) next_try_ms %ld ms",
0344 el->buffer, el->len, el->destqstr, el->tries,
0345 G_bridge_cfg.qretries, qhash->nrmsg, time_in_q, G_bridge_cfg.qttl,
0346 el->next_try_ms);
0347
0348
0349 if (el->tries <= G_bridge_cfg.qretries && time_in_q<=G_bridge_cfg.qttl)
0350 {
0351 if (EXSUCCEED!=(ret=ndrx_generic_q_send(el->destqstr, (char *)el->buffer,
0352 el->len, TPNOBLOCK, 0)))
0353 {
0354
0355 NDRX_LOG(log_error, "Failed to send message to [%s]: %s",
0356 el->destqstr, tpstrerror(ret));
0357
0358
0359
0360
0361
0362
0363 if (EAGAIN!=ret && EINTR!=ret)
0364 {
0365 NDRX_LOG(log_error, "Dest queue is broken");
0366
0367 br_process_error((char *)el->buffer, el->len, EXFAIL,
0368 el, el->pack_type, el->destqstr, qhash);
0369 msg_deleted=EXTRUE;
0370 }
0371
0372 else if (PACK_TYPE_TOSVC==el->pack_type)
0373 {
0374 tp_command_call_t *call = (tp_command_call_t *)el->buffer;
0375 call_age = ndrx_stopwatch_get_delta_sec(&call->timer);
0376
0377
0378
0379
0380 if ((ATMI_COMMAND_TPCALL==call->command_id ||
0381 ATMI_COMMAND_CONNECT==call->command_id) &&
0382 call->clttout > 0 && call_age >= call->clttout &&
0383 !(call->flags & TPNOTIME))
0384 {
0385
0386 NDRX_LOG(log_error, "Message expired - remove / no reply");
0387 DISCARD_CALL_LOG;
0388 RM_MSG(qhash);
0389 msg_deleted=EXTRUE;
0390 }
0391 }
0392
0393 }
0394 else
0395 {
0396
0397 RM_MSG(qhash);
0398 msg_deleted=EXTRUE;
0399 }
0400 }
0401 else
0402 {
0403 br_process_error((char *)el->buffer,
0404 el->len, EXFAIL, el, el->pack_type, el->destqstr, qhash);
0405 msg_deleted=EXTRUE;
0406 }
0407
0408
0409
0410
0411
0412 if (!msg_deleted)
0413 {
0414
0415 el->next_try_ms*=2;
0416
0417 if (el->next_try_ms>G_bridge_cfg.qmaxsleep)
0418 {
0419 el->next_try_ms=G_bridge_cfg.qmaxsleep;
0420 }
0421 else if (el->next_try_ms<G_bridge_cfg.qminsleep)
0422 {
0423 el->next_try_ms=G_bridge_cfg.qminsleep;
0424 }
0425
0426 if (el->next_try_ms < sleep_time)
0427 {
0428 sleep_time = el->next_try_ms;
0429 }
0430
0431
0432 ndrx_stopwatch_reset(&el->updatetime);
0433 MUTEX_LOCK_V(M_in_q_lock);
0434
0435
0436 break;
0437 }
0438 else
0439 {
0440 cur_was_ok=EXTRUE;
0441 }
0442
0443 MUTEX_LOCK_V(M_in_q_lock);
0444 }
0445
0446
0447 }
0448
0449
0450
0451 if (M_msgs_in_q > G_bridge_cfg.qsize && !M_stopped)
0452 {
0453 NDRX_LOG(log_error, "Max number of msgs queued in bridge: %d "
0454 "currently: %d - stop online traffic",
0455 G_bridge_cfg.qsize, M_msgs_in_q);
0456
0457
0458 MUTEX_LOCK_V(ndrx_G_global_br_lock);
0459 M_stopped=EXTRUE;
0460
0461 }
0462 else if (M_msgs_in_q < G_bridge_cfg.qsize && M_stopped)
0463 {
0464 NDRX_LOG(log_error, "Max number of msgs queued in bridge: %d "
0465 "currently: %d - resume online traffic",
0466 G_bridge_cfg.qsize, M_msgs_in_q);
0467
0468
0469 MUTEX_UNLOCK_V(ndrx_G_global_br_lock);
0470 M_stopped=EXFALSE;
0471 }
0472
0473 MUTEX_UNLOCK_V(M_in_q_lock);
0474
0475
0476 if (M_msgs_in_q > 0)
0477 {
0478 if (sleep_time>0)
0479 {
0480
0481
0482
0483
0484 if (sleep_time > G_bridge_cfg.qmaxsleep)
0485 {
0486 sleep_time=G_bridge_cfg.qminsleep;
0487 }
0488
0489 NDRX_LOG(log_info, "Sleep time: %ld ms M_msgs_in_q: %d",
0490 sleep_time, M_msgs_in_q);
0491
0492
0493
0494
0495 MUTEX_LOCK_V(M_in_q_lock);
0496
0497 gettimeofday(&now, NULL);
0498
0499 wait_time.tv_sec = now.tv_sec;
0500
0501 wait_time.tv_nsec = now.tv_usec*1000;
0502
0503 ndrx_timespec_plus(&wait_time, sleep_time);
0504
0505
0506 cret=pthread_cond_timedwait(&M_wakup_queue_runner, &M_in_q_lock, &wait_time);
0507
0508 NDRX_LOG(log_debug, "pthread_cond_timedwait returns %d: %s",
0509 cret, strerror(cret));
0510
0511 MUTEX_UNLOCK_V(M_in_q_lock);
0512 }
0513
0514
0515 if (EXSUCCEED!=ndrx_thpool_add_work2(G_bridge_cfg.thpool_queue, (void *)br_run_q_th,
0516 NULL, NDRX_THPOOL_ONEJOB, 0))
0517 {
0518 NDRX_LOG(log_debug, "Already run queued...");
0519 }
0520 }
0521
0522 out:
0523 NDRX_LOG(log_info, "Current queue stats: M_msgs_in_q=%d", M_msgs_in_q);
0524 return ret;
0525 }
0526
0527
0528
0529
0530
0531
0532 exprivate in_msg_hash_t* get_qstr_hash(char *qstr)
0533 {
0534 in_msg_hash_t *ret;
0535 int err;
0536
0537 EXHASH_FIND_STR(M_qstr_hash, qstr, ret);
0538
0539 if (NULL==ret)
0540 {
0541
0542 if (NULL==(ret=NDRX_FPMALLOC(sizeof(in_msg_hash_t), 0)))
0543 {
0544 err = errno;
0545 NDRX_LOG(log_error, "Failed to malloc %d bytes: %s",
0546 sizeof(in_msg_hash_t), strerror(err));
0547 userlog("Failed to malloc %d bytes: %s", sizeof(in_msg_hash_t),
0548 strerror(err));
0549 goto out;
0550 }
0551
0552 ret->nrmsg=0;
0553 ret->msgs=NULL;
0554 NDRX_STRCPY_SAFE(ret->qstr, qstr);
0555 EXHASH_ADD_STR( M_qstr_hash, qstr, ret );
0556
0557 NDRX_LOG(log_error, "New temporary queue [%s]", qstr);
0558 }
0559 out:
0560 return ret;
0561 }
0562
0563
0564
0565
0566
0567 expublic int br_add_to_q(char *buf, int len, int pack_type, char *destq)
0568 {
0569 int ret=EXSUCCEED;
0570 in_msg_t *msg;
0571 in_msg_hash_t *qhash;
0572 int dropmsg = EXFALSE;
0573 if (NULL==(msg=NDRX_FPMALLOC(sizeof(in_msg_t), 0)))
0574 {
0575 NDRX_ERR_MALLOC(sizeof(in_msg_t));
0576 EXFAIL_OUT(ret);
0577 }
0578
0579 if (NULL==(msg->buffer=NDRX_FPMALLOC(len, 0)))
0580 {
0581 NDRX_ERR_MALLOC(len);
0582 EXFAIL_OUT(ret);
0583 }
0584
0585
0586 msg->pack_type = pack_type;
0587 msg->len = len;
0588 msg->tries=1;
0589
0590 msg->next_try_ms=G_bridge_cfg.qminsleep;
0591 NDRX_STRCPY_SAFE(msg->destqstr, destq);
0592 memcpy(msg->buffer, buf, len);
0593
0594 ndrx_stopwatch_reset(&msg->addedtime);
0595 ndrx_stopwatch_reset(&msg->updatetime);
0596
0597
0598 NDRX_LOG(log_warn, "About to add %p/%d [%s] to in-mem queue "
0599 "for late delivery...", msg->buffer, msg->len, msg->destqstr);
0600
0601
0602 MUTEX_LOCK_V(M_in_q_lock);
0603
0604
0605 if (NULL==(qhash = get_qstr_hash(destq)))
0606 {
0607 MUTEX_UNLOCK_V(M_in_q_lock);
0608 EXFAIL_OUT(ret);
0609 }
0610
0611 if (G_bridge_cfg.qfullaction == QUEUE_ACTION_DROP && M_msgs_in_q+1 > G_bridge_cfg.qsize)
0612 {
0613 NDRX_LOG(log_error, "Temporary queue full (max: %d, new size: %d) "
0614 "and action is to drop",
0615 G_bridge_cfg.qsize, M_msgs_in_q+1);
0616 dropmsg=EXTRUE;
0617 }
0618 else if (G_bridge_cfg.qfullactionsvc == QUEUE_ACTION_DROP && qhash->nrmsg+1 > G_bridge_cfg.qsizesvc)
0619 {
0620 NDRX_LOG(log_error, "Temporary service queue is full (max: %d, new size: %d) "
0621 "and action is to drop",
0622 G_bridge_cfg.qsizesvc, qhash->nrmsg+1);
0623 dropmsg=EXTRUE;
0624 }
0625 else
0626 {
0627 M_msgs_in_q++;
0628 qhash->nrmsg++;
0629
0630 DL_APPEND(qhash->msgs, msg);
0631
0632
0633
0634
0635 if (qhash->nrmsg==1)
0636 {
0637
0638 pthread_cond_signal(&M_wakup_queue_runner);
0639 }
0640 }
0641
0642 MUTEX_UNLOCK_V(M_in_q_lock);
0643
0644 if (dropmsg)
0645 {
0646 br_process_error(buf, len, EXFAIL,
0647 msg, pack_type, destq, NULL);
0648 NDRX_FPFREE(msg->buffer);
0649 NDRX_FPFREE(msg);
0650 }
0651 else
0652 {
0653
0654 ndrx_thpool_add_work2(G_bridge_cfg.thpool_queue, (void*)br_run_q_th,
0655 NULL, NDRX_THPOOL_ONEJOB, 0);
0656 }
0657
0658 out:
0659
0660
0661 if (EXSUCCEED!=ret)
0662 {
0663 if (NULL!=msg->buffer)
0664 {
0665 NDRX_FPFREE(msg->buffer);
0666 }
0667
0668 if (NULL!=msg)
0669 {
0670 NDRX_FPFREE(msg);
0671 }
0672 }
0673
0674 return ret;
0675 }
0676
0677