0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include <ndrx_config.h>
0038 #include <string.h>
0039 #include <sys/time.h>
0040 #include <stdio.h>
0041 #include <stdlib.h>
0042 #include <memory.h>
0043 #include <sys_mqueue.h>
0044 #include <errno.h>
0045 #include <sys/param.h>
0046 #include <unistd.h>
0047 #include <fcntl.h>
0048
0049 #include <sys_unix.h>
0050 #include <atmi.h>
0051 #include <ndrstandard.h>
0052 #include <ndebug.h>
0053 #include <atmi_int.h>
0054 #include <ndrxdcmn.h>
0055 #include <utlist.h>
0056 #include <atmi_shm.h>
0057 #include <tperror.h>
0058
0059 #include "gencall.h"
0060 #include "userlog.h"
0061 #include "atmi_tls.h"
0062
0063
0064 #define SLEEP_ON_FULL_Q 170000
0065
0066
0067
0068 #define TOUT_SOURCE do\
0069 {\
0070 if (tout>0)\
0071 {\
0072 tout_act=tout;\
0073 }\
0074 else if (tout_restart >0)\
0075 {\
0076 tout_act=tout_restart;\
0077 }\
0078 else if (G_atmi_tls && G_atmi_tls->tout_next_eff > 0)\
0079 {\
0080 tout_act=G_atmi_tls->tout_next_eff;\
0081 }\
0082 else if (G_atmi_tls && G_atmi_tls->tout > 0)\
0083 {\
0084 tout_act=G_atmi_tls->tout;\
0085 }\
0086 else\
0087 {\
0088 tout_act=G_atmi_env.time_out;\
0089 }\
0090 tout_restart = tout_act;\
0091 } while (0)
0092
0093
0094 #define SET_TOUT_VALUE do\
0095 {\
0096 if (use_tout)\
0097 {\
0098 struct timeval timeval;\
0099 use_tout=1;\
0100 gettimeofday (&timeval, NULL);\
0101 TOUT_SOURCE;\
0102 abs_timeout.tv_sec = timeval.tv_sec+tout_act;\
0103 abs_timeout.tv_nsec = timeval.tv_usec*1000;\
0104 }\
0105 } while (0)
0106
0107
0108 #define SET_TOUT_CONF do\
0109 {\
0110 if (G_atmi_env.time_out==0 || flags & TPNOTIME || flags & TPNOBLOCK)\
0111 {\
0112 use_tout=0;\
0113 }\
0114 else \
0115 {\
0116 use_tout=1;\
0117 }\
0118 } while (0)
0119
0120
0121 #define PRINT_Q_INFO(X) { struct mq_attr __attr;\
0122 memset((char *)&__attr, 0, sizeof(__attr));\
0123 ndrx_mq_getattr(X, &__attr);\
0124 NDRX_LOG(log_error, "mq_flags=%ld mq_maxmsg=%ld mq_msgsize=%ld mq_curmsgs=%ld",\
0125 __attr.mq_flags, __attr.mq_maxmsg, __attr.mq_msgsize, __attr.mq_curmsgs);}
0126
0127
0128
0129
0130
0131 #ifdef MQ_PRIO_MAX
0132
0133 #if (MQ_PRIO_MAX-1 < NDRX_MSGPRIO_MAX)
0134
0135 #define NDRX_PRIO_DOWNSCALE(PRIO)\
0136 {\
0137 PRIO=( ((float)PRIO) * (float)(MQ_PRIO_MAX-1) / (float)NDRX_MSGPRIO_MAX );\
0138 }
0139
0140 #else
0141
0142 #define NDRX_PRIO_DOWNSCALE(PRIO)
0143 #endif
0144
0145 #else
0146
0147
0148 #define NDRX_PRIO_DOWNSCALE(PRIO)
0149
0150 #endif
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161
0162
0163 expublic void ndrx_tptoutset(int tout)
0164 {
0165 NDRX_LOG(log_info, "%s: NDRX_TOUT override from %d to %d seconds",
0166 __func__, G_atmi_env.time_out, tout);
0167 G_atmi_env.time_out = tout;
0168 }
0169
0170
0171
0172
0173
0174 expublic int ndrx_tptoutget(void)
0175 {
0176 return G_atmi_env.time_out;
0177 }
0178
0179
0180
0181
0182
0183 expublic int ndrx_tptoutget_eff(void)
0184 {
0185 int use_tout=1, tout_act=0, tout_restart=EXFAIL, tout=0;
0186
0187 TOUT_SOURCE;
0188
0189 return tout_act;
0190 }
0191
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204 expublic void ndrx_mq_fix_mass_send(int *cntr)
0205 {
0206 *cntr = *cntr + 1;
0207
0208 if (*cntr >= G_atmi_env.msg_max - Q_SEND_GRACE)
0209 {
0210 NDRX_LOG(log_error, "About to sleep! %d %d", *cntr, G_atmi_env.msg_max);
0211 usleep(SLEEP_ON_FULL_Q);
0212 *cntr = 0;
0213 }
0214 }
0215
0216
0217
0218
0219
0220
0221 expublic int ndrx_q_setblock(mqd_t q_descr, int blocked)
0222 {
0223 int ret=EXSUCCEED;
0224 struct mq_attr new;
0225 struct mq_attr old;
0226 int change = EXFALSE;
0227
0228 if (EXSUCCEED!= ndrx_mq_getattr(q_descr, &old))
0229 {
0230 NDRX_LOG(log_warn, "Failed to get attribs of Q: %d, err: %s",
0231 q_descr, strerror(errno));
0232 ret=EXFAIL;
0233 goto out;
0234 }
0235
0236
0237 if (!blocked && !(old.mq_flags & O_NONBLOCK))
0238 {
0239 memcpy(&new, &old, sizeof(new));
0240 NDRX_LOG(log_warn, "Switching qd %d to non-blocked", q_descr);
0241 new.mq_flags |= O_NONBLOCK;
0242 change = EXTRUE;
0243 }
0244
0245 else if (blocked && old.mq_flags & O_NONBLOCK)
0246 {
0247 memcpy(&new, &old, sizeof(new));
0248 NDRX_LOG(log_warn, "Switching qd %d to blocked", q_descr);
0249 new.mq_flags &= ~O_NONBLOCK;
0250 change = EXTRUE;
0251 }
0252
0253 if (change)
0254 {
0255 if (EXFAIL==ndrx_mq_setattr(q_descr, &new,
0256 &old))
0257 {
0258 NDRX_LOG(log_error, "Failed to set attribs for qd %d: %s",
0259 q_descr, strerror(errno));
0260 ret=EXFAIL;
0261 goto out;
0262 }
0263 }
0264
0265 out:
0266 return ret;
0267 }
0268
0269
0270
0271
0272
0273
0274
0275
0276 expublic mqd_t ndrx_mq_open_at(char *name, int oflag, mode_t mode,
0277 struct mq_attr *attr)
0278 {
0279 struct mq_attr attr_int;
0280 struct mq_attr * p_at;
0281 mqd_t ret;
0282 int errno_save;
0283
0284 if (NULL==attr)
0285 {
0286 memset(&attr_int, 0, sizeof(attr_int));
0287 p_at = &attr_int;
0288 }
0289 else
0290 {
0291 p_at = attr;
0292 }
0293
0294 if (!p_at->mq_maxmsg)
0295 p_at->mq_maxmsg = G_atmi_env.msg_max;
0296
0297 if (!p_at->mq_msgsize)
0298 p_at->mq_msgsize = G_atmi_env.msgsize_max;
0299
0300 ret=ndrx_mq_open(name, oflag, mode, p_at);
0301 errno_save=errno;
0302
0303 NDRX_LOG(6, "ndrx_mq_open_at(name=%s) returns 0x%lx (mq_maxmsg: %d mq_msgsize: %d)",
0304 name, (long int) ret, p_at->mq_maxmsg, p_at->mq_msgsize);
0305 errno=errno_save;
0306 return ret;
0307 }
0308
0309
0310
0311
0312
0313
0314
0315 expublic mqd_t ndrx_mq_open_at_wrp(char *name, int oflag)
0316 {
0317
0318 return ndrx_mq_open_at(name, oflag, 0, NULL);
0319 }
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332 expublic int ndrx_generic_qfd_send(mqd_t q_descr, char *data, long len, long flags)
0333 {
0334 int ret=EXSUCCEED;
0335 int use_tout, tout_act=0, tout_restart=EXFAIL, tout=0;
0336 struct timespec abs_timeout;
0337 int snd_prio;
0338 SET_TOUT_CONF;
0339
0340 snd_prio = NDRX_MSGPRIO_DEFAULT;
0341
0342 NDRX_PRIO_DOWNSCALE(snd_prio);
0343
0344 restart:
0345
0346 SET_TOUT_VALUE;
0347
0348
0349 if ((!use_tout && EXFAIL==ndrx_mq_send(q_descr, data, len, snd_prio)) ||
0350 (use_tout && EXFAIL==ndrx_mq_timedsend(q_descr, data, len, snd_prio, &abs_timeout)))
0351 {
0352 if (EINTR==errno && flags & TPSIGRSTRT)
0353 {
0354 NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_send");
0355 goto restart;
0356 }
0357 else if (EAGAIN==errno)
0358 {
0359 PRINT_Q_INFO(q_descr);
0360 }
0361 ret=errno;
0362 NDRX_LOG(log_error, "Failed to send data to fd [%d] with error: %s",
0363 q_descr, strerror(ret));
0364 }
0365
0366 out:
0367 return ret;
0368 }
0369
0370
0371
0372
0373
0374
0375
0376
0377
0378 expublic int ndrx_generic_q_send(char *queue, char *data, long len, long flags, int msg_prio)
0379 {
0380 return ndrx_generic_q_send_2(queue, data, len, flags, EXFAIL, msg_prio);
0381 }
0382
0383
0384
0385
0386
0387
0388
0389
0390
0391
0392
0393
0394 expublic int ndrx_generic_q_send_2(char *queue, char *data, long len, long flags,
0395 long tout, int msg_prio)
0396 {
0397 int ret=EXSUCCEED;
0398 mqd_t q_descr=(mqd_t)EXFAIL;
0399 int use_tout, tout_restart=EXFAIL, tout_act=0;
0400 struct timespec abs_timeout;
0401 long add_flags = 0;
0402 int snd_prio;
0403 SET_TOUT_CONF;
0404
0405
0406 if (flags & TPNOBLOCK)
0407 {
0408 NDRX_LOG(log_debug, "Enabling NONBLOCK send");
0409 add_flags|=O_NONBLOCK;
0410 }
0411
0412
0413
0414 restart_open:
0415 q_descr = ndrx_mq_open_at_wrp (queue, O_WRONLY | add_flags);
0416
0417 if ((mqd_t)EXFAIL==q_descr && EINTR==errno && flags & TPSIGRSTRT)
0418 {
0419 NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_open");
0420 goto restart_open;
0421 }
0422
0423 if ((mqd_t)EXFAIL==q_descr)
0424 {
0425 NDRX_LOG(log_error, "Failed to open queue [%s] with error: %s",
0426 queue, strerror(errno));
0427 ret=errno;
0428 goto out;
0429 }
0430
0431
0432 restart_send:
0433
0434 SET_TOUT_VALUE;
0435
0436 if (0==msg_prio)
0437 {
0438
0439 msg_prio = NDRX_MSGPRIO_DEFAULT;
0440 }
0441
0442
0443 if (NULL!=G_atmi_tls && G_atmi_tls->prio)
0444 {
0445
0446
0447 if (G_atmi_tls->prio_flags & TPABSOLUTE)
0448 {
0449 msg_prio = G_atmi_tls->prio;
0450 }
0451 else
0452 {
0453
0454 msg_prio += G_atmi_tls->prio;
0455 }
0456 }
0457
0458 if (msg_prio<NDRX_MSGPRIO_MIN)
0459 {
0460
0461 msg_prio = NDRX_MSGPRIO_MIN;
0462 }
0463 else if (msg_prio>NDRX_MSGPRIO_MAX)
0464 {
0465
0466 msg_prio = NDRX_MSGPRIO_MAX;
0467 }
0468
0469 snd_prio=msg_prio;
0470
0471 NDRX_PRIO_DOWNSCALE(snd_prio);
0472
0473 NDRX_LOG(log_debug, "len: %d use timeout: %d config: %d prio: %d snd_prio: %d",
0474 len, use_tout, tout_act, msg_prio, snd_prio);
0475 if ((!use_tout && EXFAIL==ndrx_mq_send(q_descr, data, len, snd_prio)) ||
0476 (use_tout && EXFAIL==ndrx_mq_timedsend(q_descr, data, len, snd_prio, &abs_timeout)))
0477 {
0478 ret=errno;
0479 if (EINTR==errno && flags & TPSIGRSTRT)
0480 {
0481 NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_send");
0482 goto restart_send;
0483 }
0484 else if (EAGAIN==errno)
0485 {
0486 PRINT_Q_INFO(q_descr);
0487 }
0488 NDRX_LOG(log_error, "Failed to send data to queue [%s] with error: %d:%s",
0489 queue, ret, strerror(ret));
0490 }
0491
0492 out:
0493
0494 restart_close:
0495
0496 if ((mqd_t)EXFAIL!=q_descr && EXFAIL==ndrx_mq_close(q_descr))
0497 {
0498 if (EINTR==errno && flags & TPSIGRSTRT)
0499 {
0500 NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_close");
0501 goto restart_close;
0502 }
0503 }
0504
0505
0506 if (NULL!=G_atmi_tls)
0507 {
0508 G_atmi_tls->prio = 0;
0509 G_atmi_tls->prio_flags = 0;
0510 G_atmi_tls->prio_last = msg_prio;
0511 }
0512
0513 return ret;
0514 }
0515
0516
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526 expublic ssize_t ndrx_generic_q_receive(mqd_t q_descr, char *q_str,
0527 struct mq_attr *reply_q_attr,
0528 char *buf, long buf_max,
0529 unsigned *prio, long flags)
0530 {
0531 ssize_t ret=EXSUCCEED;
0532 int use_tout, tout_restart=EXFAIL, tout_act=0, tout=0;
0533 struct timespec abs_timeout;
0534
0535 SET_TOUT_CONF;
0536
0537 if (NULL!=q_str && NULL!=reply_q_attr)
0538 {
0539 if (EXSUCCEED!=ndrx_setup_queue_attrs(reply_q_attr, q_descr, q_str, flags))
0540 {
0541 NDRX_LOG(log_error, "%s: Failed to setup queue attribs, flags=%ld", flags);
0542 EXFAIL_OUT(ret);
0543 }
0544 }
0545
0546 restart:
0547 SET_TOUT_VALUE;
0548 NDRX_LOG(6, "use timeout: %d config: %d qdescr: %lx", use_tout,
0549 tout_act, (long int)q_descr);
0550 if ((!use_tout && EXFAIL==(ret=ndrx_mq_receive (q_descr, (char *)buf, buf_max, prio))) ||
0551 (use_tout && EXFAIL==(ret=ndrx_mq_timedreceive (q_descr, (char *)buf, buf_max, prio, &abs_timeout))))
0552 {
0553 if (EINTR==errno && flags & TPSIGRSTRT)
0554 {
0555 NDRX_LOG(log_warn, "Got signal interrupt, restarting ndrx_mq_receive");
0556 goto restart;
0557 }
0558
0559 ret=errno;
0560 if (EAGAIN==ret)
0561 {
0562 NDRX_LOG(log_debug, "No messages in queue");
0563 ret= GEN_QUEUE_ERR_NO_DATA;
0564 }
0565 else
0566 {
0567 int err;
0568
0569 CONV_ERROR_CODE(ret, err);
0570
0571 ndrx_TPset_error_fmt(err, "ndrx_mq_receive failed for %lx (%zd): %s",
0572 (long int)q_descr, ret, strerror(ret));
0573 ret=EXFAIL;
0574 }
0575 }
0576
0577 out:
0578 NDRX_LOG(log_debug, "ndrx_generic_q_receive: %zd", ret);
0579 return ret;
0580 }
0581
0582
0583
0584
0585
0586 expublic void cmd_generic_init(int ndrxd_cmd, int msg_src, int msg_type,
0587 command_call_t *call, char *reply_q)
0588 {
0589 call->proto_ver[0]=0;
0590 call->proto_ver[1]=0;
0591 call->proto_ver[2]=0;
0592 call->proto_ver[3]=0;
0593
0594 call->command_id = ndrxd_cmd;
0595 call->command = ndrxd_cmd;
0596 call->magic = NDRX_MAGIC;
0597 call->msg_src = msg_src;
0598 call->msg_type = msg_type;
0599 NDRX_STRCPY_SAFE(call->reply_queue, reply_q);
0600 call->caller_nodeid = G_atmi_env.our_nodeid;
0601 }
0602
0603
0604
0605
0606
0607
0608
0609
0610
0611 expublic int cmd_generic_call_2(int ndrxd_cmd, int msg_src, int msg_type,
0612 command_call_t *call, size_t call_size,
0613 char *reply_q,
0614 mqd_t reply_queue,
0615 mqd_t admin_queue,
0616 char *admin_q_str,
0617 int argc, char **argv,
0618 int *p_have_next,
0619 int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0620 void (*p_put_output)(char *text),
0621 int need_reply,
0622 int reply_only,
0623 char *rply_buf_out,
0624 int *rply_buf_out_len,
0625 int flags,
0626
0627
0628 int (*p_rply_request)(char **buf, long len)
0629 )
0630 {
0631 int ret=EXSUCCEED;
0632 command_reply_t *reply;
0633 unsigned prio = 0;
0634 char *msg_buffer_max= NULL;
0635 ssize_t reply_len;
0636 int attempts = 1;
0637
0638 restart:
0639 NDRX_LOG(log_debug, "gencall command: %d, reply_only=%d, need_reply=%d "
0640 "call flags=0x%x, getcall flags=%d",
0641 ndrxd_cmd, reply_only, need_reply, (NULL!=call?call->flags:0), flags);
0642
0643 if (NULL!=rply_buf_out && NULL==rply_buf_out_len)
0644 {
0645 NDRX_LOG(log_error, "User buffer address specified in params, "
0646 "but rply_buf_out_len is NULL!");
0647 EXFAIL_OUT(ret);
0648 }
0649
0650 if (!reply_only)
0651 {
0652 call->command = ndrxd_cmd;
0653 call->command_id = ndrxd_cmd;
0654 call->magic = NDRX_MAGIC;
0655 call->msg_src = msg_src;
0656 call->msg_type = msg_type;
0657 NDRX_STRCPY_SAFE(call->reply_queue, reply_q);
0658 call->caller_nodeid = G_atmi_env.our_nodeid;
0659
0660 if ((mqd_t)EXFAIL!=admin_queue)
0661 {
0662 NDRX_LOG(log_error, "Sending data to [%s], fd=%d, call flags=0x%x",
0663 admin_q_str, admin_queue, call->flags);
0664 if (EXSUCCEED!=(ret=ndrx_generic_qfd_send(admin_queue,
0665 (char *)call, call_size, flags)))
0666 {
0667 NDRX_LOG(log_error, "Failed to send msg to ndrxd!");
0668
0669 if (NULL!=p_put_output)
0670 {
0671 if (EINVAL==ret)
0672 {
0673 p_put_output(NDRX_QERR_MSG_EINVAL);
0674 }
0675 else
0676 {
0677 p_put_output(NDRX_XADMIN_ERR_FMT_PFX "Failed to send msg to ndrxd!");
0678 }
0679 }
0680
0681 goto out;
0682 }
0683 }
0684 else
0685 {
0686 NDRX_LOG(log_info, "Sending data to [%s] call flags=0x%x",
0687 admin_q_str, call->flags);
0688 if (EXSUCCEED!=(ret=ndrx_generic_q_send(admin_q_str,
0689 (char *)call, call_size, flags, 0)))
0690 {
0691 if (NULL!=p_put_output)
0692 {
0693 if (EINVAL==ret)
0694 {
0695 p_put_output(NDRX_QERR_MSG_EINVAL);
0696 }
0697 else
0698 {
0699 p_put_output(NDRX_XADMIN_ERR_FMT_PFX "Failed to send msg to ndrxd!");
0700 }
0701 }
0702
0703 ret=EXFAIL;
0704 goto out;
0705 }
0706 }
0707 }
0708 else
0709 {
0710 NDRX_LOG(log_debug, "Reply mode only");
0711 }
0712
0713 if (need_reply)
0714 {
0715 NDRX_LOG(log_debug, "Waiting for response from ndrxd on [%s]", reply_q);
0716 }
0717 else
0718 {
0719 NDRX_LOG(log_debug, "Reply not needed!");
0720 goto out;
0721 }
0722
0723 do
0724 {
0725 command_call_t *test_call;
0726
0727 if (NULL==msg_buffer_max)
0728 {
0729 NDRX_SYSBUF_MALLOC_WERR_OUT(msg_buffer_max, reply_len, ret);
0730 }
0731 else
0732 {
0733 reply_len = NDRX_MSGSIZEMAX;
0734 }
0735
0736 test_call = (command_call_t *)msg_buffer_max;
0737
0738
0739
0740
0741 if (0>(reply_len=ndrx_generic_q_receive(reply_queue, NULL, NULL,
0742 msg_buffer_max, reply_len, &prio, flags)))
0743 {
0744 NDRX_LOG(log_error, "Failed to receive reply from ndrxd!");
0745
0746 if (NULL!=p_put_output)
0747 {
0748 p_put_output("\nERROR ! Failed to receive reply from ndrxd\n(if timeout - check "
0749 "NDRX_XADMINTOUT settings)!");
0750 }
0751
0752 ret=EXFAIL;
0753 goto out;
0754 }
0755 else if (test_call->command % 2 == 0 && NULL!=p_rply_request)
0756 {
0757 if (EXSUCCEED!=p_rply_request(&msg_buffer_max, (size_t)reply_len))
0758 {
0759 NDRX_LOG(log_error, "Failed to process request!");
0760 ret=EXFAIL;
0761 goto out;
0762 }
0763 else
0764 {
0765 NDRX_LOG(log_warn, "Waiting for next rply msg...");
0766 continue;
0767 }
0768 }
0769 else if (reply_len < sizeof(command_reply_t))
0770 {
0771 NDRX_LOG(log_error, "Reply size %zd, expected atleast %zu!",
0772 reply_len, sizeof(command_reply_t));
0773
0774 if (NULL!=p_put_output)
0775 p_put_output("Invalid reply size from ndrxd!");
0776
0777 ret=EXFAIL;
0778 goto out;
0779 }
0780
0781 reply = (command_reply_t *)msg_buffer_max;
0782
0783 if (NDRX_MAGIC!=reply->magic)
0784 {
0785 NDRX_LOG(log_error, "Got invalid response from ndrxd: invalid magic "
0786 "(expected: %ld, got: %ld)!",
0787 NDRX_MAGIC, reply->magic);
0788
0789 if (NULL!=p_put_output)
0790 p_put_output("Invalid response - invalid header!");
0791
0792 ret=EXFAIL;
0793 goto out;
0794 }
0795
0796
0797 if (ndrxd_cmd+1 != reply->command)
0798 {
0799 NDRX_LOG(log_error, "Got invalid response from ndrxd: expected: %d, got %d",
0800 call->command+1, reply->command);
0801
0802 if (NULL!=p_put_output)
0803 p_put_output("Invalid response - invalid response command code!");
0804
0805 ret=EXFAIL;
0806 goto out;
0807 }
0808
0809
0810 if (EXSUCCEED==reply->status)
0811 {
0812 if (NULL!=p_rsp_process)
0813 {
0814 ret=p_rsp_process(reply, reply_len);
0815 }
0816 else
0817 {
0818 if (NULL!=p_put_output)
0819 p_put_output("OK");
0820 }
0821
0822 if (NULL!=rply_buf_out && NULL!=rply_buf_out_len)
0823 {
0824 if (reply_len>*rply_buf_out_len)
0825 {
0826 NDRX_LOG(log_warn, "Received packet size %zd longer "
0827 "than user buffer in rply_buf_len %d",
0828 reply_len, *rply_buf_out_len);
0829 EXFAIL_OUT(ret);
0830 }
0831 else
0832 {
0833 memcpy(rply_buf_out, reply, reply_len);
0834 *rply_buf_out_len = reply_len;
0835 }
0836 }
0837 }
0838 else
0839 {
0840 char buf[2048];
0841
0842 attempts++;
0843
0844 if (NDRXD_ENORMAL==reply->error_code && attempts < G_atmi_env.max_normwait)
0845 {
0846 snprintf(buf, sizeof(buf), "%s. Attempt %d/%d",
0847 reply->error_msg, attempts, G_atmi_env.max_normwait);
0848
0849 NDRX_LOG(log_warn, "%s", buf);
0850
0851 if (NULL!=p_put_output)
0852 p_put_output(buf);
0853
0854 sleep(1);
0855
0856 goto restart;
0857 }
0858 else
0859 {
0860 snprintf(buf, sizeof(buf), "fail, code: %d: %s",
0861 reply->error_code, reply->error_msg);
0862 }
0863
0864 NDRX_LOG(log_warn, "%s", buf);
0865
0866 if (NULL!=p_put_output)
0867 p_put_output(buf);
0868
0869 ret = reply->status;
0870 goto out;
0871 }
0872
0873 } while((reply->flags & NDRXD_CALL_FLAGS_RSPHAVE_MORE));
0874
0875 out:
0876
0877
0878 if (NULL!=msg_buffer_max)
0879 {
0880 NDRX_SYSBUF_FREE(msg_buffer_max);
0881 }
0882
0883 return ret;
0884 }
0885
0886
0887
0888
0889
0890
0891 expublic int cmd_generic_call(int ndrxd_cmd, int msg_src, int msg_type,
0892 command_call_t *call, size_t call_size,
0893 char *reply_q,
0894 mqd_t reply_queue,
0895 mqd_t admin_queue,
0896 char *admin_q_str,
0897 int argc, char **argv,
0898 int *p_have_next,
0899 int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0900 void (*p_put_output)(char *text),
0901 int need_reply)
0902 {
0903 return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0904 call, call_size,
0905 reply_q,
0906 reply_queue,
0907 admin_queue,
0908 admin_q_str,
0909 argc, argv,
0910 p_have_next,
0911 p_rsp_process,
0912 p_put_output,
0913 need_reply,
0914 EXFALSE,
0915 NULL, NULL, TPNOTIME, NULL);
0916 }
0917
0918
0919
0920
0921
0922 expublic int cmd_generic_callfl(int ndrxd_cmd, int msg_src, int msg_type,
0923 command_call_t *call, size_t call_size,
0924 char *reply_q,
0925 mqd_t reply_queue,
0926 mqd_t admin_queue,
0927 char *admin_q_str,
0928 int argc, char **argv,
0929 int *p_have_next,
0930 int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0931 void (*p_put_output)(char *text),
0932 int need_reply,
0933 int flags)
0934 {
0935 return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0936 call, call_size,
0937 reply_q,
0938 reply_queue,
0939 admin_queue,
0940 admin_q_str,
0941 argc, argv,
0942 p_have_next,
0943 p_rsp_process,
0944 p_put_output,
0945 need_reply,
0946 EXFALSE,
0947 NULL, NULL, flags, NULL);
0948 }
0949
0950
0951
0952
0953
0954
0955 expublic int cmd_generic_bufcall(int ndrxd_cmd, int msg_src, int msg_type,
0956 command_call_t *call, size_t call_size,
0957 char *reply_q,
0958 mqd_t reply_queue,
0959 mqd_t admin_queue,
0960 char *admin_q_str,
0961 int argc, char **argv,
0962 int *p_have_next,
0963 int (*p_rsp_process)(command_reply_t *reply, size_t reply_len),
0964 void (*p_put_output)(char *text),
0965 int need_reply,
0966 int reply_only,
0967 char *rply_buf_out,
0968 int *rply_buf_out_len,
0969 int flags,
0970 int (*p_rply_request)(char **buf, long len)
0971 )
0972 {
0973 return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
0974 call, call_size,
0975 reply_q,
0976 reply_queue,
0977 admin_queue,
0978 admin_q_str,
0979 argc, argv,
0980 p_have_next,
0981 p_rsp_process,
0982 p_put_output,
0983 need_reply,
0984 reply_only,
0985 rply_buf_out,
0986 rply_buf_out_len,
0987 flags,
0988 p_rply_request);
0989 }
0990
0991
0992
0993
0994
0995
0996
0997
0998
0999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011 extern int cmd_generic_listcall(int ndrxd_cmd, int msg_src, int msg_type,
1012 command_call_t *call, size_t call_size,
1013 char *reply_q,
1014 mqd_t reply_queue,
1015 mqd_t admin_queue,
1016 char *admin_q_str,
1017 int argc, char **argv,
1018 int *p_have_next,
1019 gencall_args_t *arglist,
1020 int reply_only,
1021 long flags)
1022 {
1023 return cmd_generic_call_2(ndrxd_cmd, msg_src, msg_type,
1024 call, call_size,
1025 reply_q,
1026 reply_queue,
1027 admin_queue,
1028 admin_q_str,
1029 argc, argv,
1030 p_have_next,
1031 arglist[ndrxd_cmd].p_rsp_process,
1032 arglist[ndrxd_cmd].p_put_output,
1033 arglist[ndrxd_cmd].need_reply,
1034 reply_only,
1035
1036 NULL, NULL, TPSIGRSTRT | flags, NULL);
1037 }
1038
1039
1040
1041
1042
1043 expublic int reply_with_failure(long flags, tp_command_call_t *last_call,
1044 char *buf, int *len, long rcode)
1045 {
1046 int ret=EXSUCCEED;
1047 char fn[] = "reply_with_failure";
1048 tp_command_call_t call_b;
1049 tp_command_call_t *call;
1050 char reply_to[NDRX_MAX_Q_SIZE+1] = {EXEOS};
1051
1052
1053 if (last_call->flags & TPNOREPLY)
1054 {
1055 NDRX_LOG(log_warn, "No reply expected ignore error delivery");
1056 goto out;
1057 }
1058
1059 if (NULL==buf)
1060 {
1061 call = &call_b;
1062 }
1063 else
1064 {
1065 call = (tp_command_call_t *)buf;
1066 }
1067
1068 memset(call, 0, sizeof(*call));
1069 call->command_id = ATMI_COMMAND_TPREPLY;
1070 call->cd = last_call->cd;
1071 call->timestamp = last_call->timestamp;
1072 call->callseq = last_call->callseq;
1073
1074 NDRX_STRCPY_SAFE(call->reply_to, last_call->reply_to);
1075 call->sysflags |=SYS_FLAG_REPLY_ERROR;
1076 call->rcode = rcode;
1077 NDRX_STRCPY_SAFE(call->callstack, last_call->callstack);
1078
1079 NDRX_LOG(log_debug, "error reply cd %d callseq %u timestamp %ld queue [%s] error %ld",
1080 call->cd, call->callseq, call->timestamp, call->reply_to, call->rcode);
1081 if (EXSUCCEED!=fill_reply_queue(call->callstack, last_call->reply_to, reply_to))
1082 {
1083 NDRX_LOG(log_error, "ATTENTION!! Failed to get reply queue");
1084 userlog("ATTENTION!! Failed to get reply queue");
1085 goto out;
1086 }
1087
1088 if (NULL==buf)
1089 {
1090 if (EXSUCCEED!=(ret=ndrx_generic_q_send(reply_to, (char *)call,
1091 sizeof(*call), flags, 0)))
1092 {
1093 NDRX_LOG(log_error, "%s: Failed to send error reply back, os err: %s",
1094 fn, strerror(ret));
1095 goto out;
1096 }
1097 }
1098 else
1099 {
1100 NDRX_LOG(log_debug, "Buffer specified not sending anywhere");
1101 }
1102
1103
1104 out:
1105 return ret;
1106 }
1107
1108
1109
1110
1111
1112
1113
1114 expublic int ndrx_get_q_attr(char *q, struct mq_attr *p_att)
1115 {
1116 int ret=EXSUCCEED;
1117 mqd_t q_descr=(mqd_t)EXFAIL;
1118
1119
1120 if ((mqd_t)EXFAIL==(q_descr = ndrx_mq_open_at_wrp(q, 0)))
1121 {
1122 NDRX_LOG(log_warn, "Failed to get attribs of Q: [%s], err: %s",
1123 q, strerror(errno));
1124 EXFAIL_OUT(ret);
1125 }
1126
1127
1128 if (EXSUCCEED!= ndrx_mq_getattr(q_descr, p_att))
1129 {
1130 NDRX_LOG(log_warn, "Failed to get attribs of Q: %d, err: %s",
1131 q_descr, strerror(errno));
1132 EXFAIL_OUT(ret);
1133 }
1134
1135 out:
1136
1137 if ((mqd_t)EXFAIL!=q_descr)
1138 {
1139 ndrx_mq_close(q_descr);
1140 }
1141
1142 return ret;
1143 }
1144
1145
1146
1147
1148
1149
1150 expublic atmi_svc_list_t* ndrx_get_svc_list(int (*p_filter)(char *svcnm))
1151 {
1152 atmi_svc_list_t *ret = NULL;
1153 atmi_svc_list_t *tmp;
1154 shm_svcinfo_t *svcinfo = (shm_svcinfo_t *) G_svcinfo.mem;
1155 int i;
1156
1157 if (NULL==svcinfo)
1158 {
1159 NDRX_LOG(log_error, "shm_svcinfo memory is NULL!");
1160 return NULL;
1161 }
1162
1163
1164 for (i=0; i<G_max_svcs; i++)
1165 {
1166
1167
1168
1169 if (EXEOS!=SHM_SVCINFO_INDEX(svcinfo, i)->service[0] &&
1170 (SHM_SVCINFO_INDEX(svcinfo, i)->srvs || SHM_SVCINFO_INDEX(svcinfo, i)->csrvs) )
1171 {
1172
1173 if (p_filter(SHM_SVCINFO_INDEX(svcinfo, i)->service))
1174 {
1175 if (NULL==(tmp = NDRX_CALLOC(1, sizeof(atmi_svc_list_t))))
1176 {
1177 NDRX_LOG(log_error, "Failed to malloc %d: %s",
1178 sizeof(atmi_svc_list_t), strerror(errno));
1179
1180 userlog("Failed to malloc %d: %s",
1181 sizeof(atmi_svc_list_t), strerror(errno));
1182
1183 goto out;
1184 }
1185 NDRX_STRCPY_SAFE(tmp->svcnm, SHM_SVCINFO_INDEX(svcinfo, i)->service);
1186 LL_APPEND(ret, tmp);
1187 }
1188 }
1189 }
1190
1191 out:
1192 return ret;
1193 }
1194
1195
1196
1197
1198
1199
1200
1201 expublic int ndrx_setup_queue_attrs(struct mq_attr *p_q_attr,
1202 mqd_t listen_q,
1203 char *listen_q_str,
1204 long flags)
1205 {
1206 int ret=EXSUCCEED;
1207 int change_flags = EXFALSE;
1208 struct mq_attr new;
1209 char fn[] = "ndrx_setup_queue_attrs";
1210
1211
1212
1213 if (flags & TPNOBLOCK && !(p_q_attr->mq_flags & O_NONBLOCK))
1214 {
1215
1216 new = *p_q_attr;
1217 new.mq_flags |= O_NONBLOCK;
1218 change_flags = EXTRUE;
1219 NDRX_LOG(log_debug, "Changing queue [%s] to non blocked",
1220 listen_q_str);
1221 }
1222 else if (!(flags & TPNOBLOCK) && (p_q_attr->mq_flags & O_NONBLOCK))
1223 {
1224
1225 new = *p_q_attr;
1226 new.mq_flags &= ~O_NONBLOCK;
1227 change_flags = EXTRUE;
1228 NDRX_LOG(log_debug, "Changing queue [%s] to blocked",
1229 listen_q_str);
1230 }
1231
1232 if (change_flags)
1233 {
1234 if (EXFAIL==ndrx_mq_setattr(listen_q, &new,
1235 NULL))
1236 {
1237 ndrx_TPset_error_fmt(TPEOS, "%s: Failed to change attributes for queue [%s] fd %d: %s",
1238 fn, listen_q_str, listen_q, strerror(errno));
1239 ret=EXFAIL;
1240 goto out;
1241 }
1242 else
1243 {
1244
1245 *p_q_attr = new;
1246 }
1247 }
1248
1249
1250
1251 out:
1252 return ret;
1253 }
1254
1255