0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <stdio.h>
0037 #include <stdarg.h>
0038 #include <memory.h>
0039 #include <stdlib.h>
0040 #include <errno.h>
0041 #include <sys_mqueue.h>
0042 #include <fcntl.h>
0043
0044 #include <atmi.h>
0045 #include <ndebug.h>
0046 #include <tperror.h>
0047 #include <typed_buf.h>
0048 #include <atmi_int.h>
0049
0050 #include "../libatmisrv/srv_int.h"
0051 #include "userlog.h"
0052 #include "exregex.h"
0053 #include <thlock.h>
0054 #include <xa_cmn.h>
0055 #include <atmi_shm.h>
0056 #include <atmi_tls.h>
0057 #include <utlist.h>
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083 expublic int ndrx_tpnotify(CLIENTID *clientid, TPMYID *p_clientid_myid,
0084 char *cltq,
0085 char *data, long len, long flags,
0086 int dest_node, char *nodeid, char *usrname, char *cltname,
0087 int ex_flags)
0088 {
0089 int ret=EXSUCCEED;
0090 char *buf=NULL;
0091 size_t buf_len;
0092 tp_notif_call_t *call;
0093 buffer_obj_t *buffer_info;
0094 long data_len = MAX_CALL_DATA_SIZE;
0095 char send_q[NDRX_MAX_Q_SIZE+1];
0096 time_t timestamp;
0097 int is_bridge;
0098 int tpcall_cd;
0099 long local_node = tpgetnodeid();
0100 ATMI_TLS_ENTRY;
0101
0102 NDRX_LOG(log_debug, "%s enter", __func__);
0103
0104 NDRX_SYSBUF_MALLOC_WERR_OUT(buf, buf_len, ret);
0105 call = (tp_notif_call_t *)buf;
0106
0107
0108 memset(call, 0, sizeof(tp_notif_call_t));
0109
0110
0111 if (NULL!=clientid)
0112 {
0113 NDRX_STRCPY_SAFE(call->destclient, clientid->clientdata);
0114 }
0115
0116 call->destnodeid = 0;
0117
0118
0119 if (ex_flags & TPCALL_BRCALL)
0120 {
0121 call->destnodeid = dest_node;
0122
0123 #if defined(EX_USE_POLL) || defined(EX_USE_SYSVQ)
0124 {
0125 int tmp_is_bridge;
0126 char tmpsvc[MAXTIDENT+1];
0127
0128 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SVC_BRIDGE, dest_node);
0129
0130 if (EXSUCCEED!=ndrx_shm_get_svc(tmpsvc, send_q, &tmp_is_bridge,
0131 NULL))
0132 {
0133 NDRX_LOG(log_error, "Failed to get bridge svc: [%s]",
0134 tmpsvc);
0135 EXFAIL_OUT(ret);
0136 }
0137 }
0138 #else
0139 snprintf(send_q, sizeof(send_q), NDRX_SVC_QBRDIGE,
0140 G_atmi_tls->G_atmi_conf.q_prefix, dest_node);
0141 #endif
0142
0143 is_bridge=EXTRUE;
0144 }
0145 else if (NULL!=cltq)
0146 {
0147
0148 NDRX_STRCPY_SAFE(send_q, cltq);
0149 }
0150 else
0151 {
0152
0153
0154
0155
0156
0157 if (EXSUCCEED!=ndrx_myid_convert_to_q(p_clientid_myid, send_q,
0158 sizeof(send_q)))
0159 {
0160 ndrx_TPset_error_fmt(TPEINVAL, "Failed to translate client data [%s] to Q",
0161 clientid->clientdata);
0162 EXFAIL_OUT(ret);
0163 }
0164
0165 if (p_clientid_myid->nodeid!=local_node)
0166 {
0167 NDRX_LOG(log_info, "The client [%s] resists on another node [%ld], "
0168 "thus send in cluster", clientid->clientdata,
0169 (long)p_clientid_myid->nodeid);
0170
0171
0172
0173
0174 ret = ndrx_tpnotify(clientid, p_clientid_myid,
0175 cltq,
0176 data, len, flags,
0177 p_clientid_myid->nodeid,
0178 nodeid, usrname, cltname,ex_flags | TPCALL_BRCALL);
0179
0180 goto out;
0181
0182 }
0183 }
0184
0185 if (NULL!=data)
0186 {
0187 if (NULL==(buffer_info = ndrx_find_buffer(data)))
0188 {
0189 ndrx_TPset_error_fmt(TPEINVAL, "Buffer %p not known to system!", __func__);
0190 EXFAIL_OUT(ret);
0191 }
0192 }
0193
0194
0195 if (EXSUCCEED!=ndrx_mbuf_prepare_outgoing(data, len, call->data,
0196 &data_len, flags, 0L))
0197 {
0198
0199 EXFAIL_OUT(ret);
0200 }
0201
0202
0203 call->data_len = data_len;
0204
0205 data_len+=sizeof(tp_notif_call_t);
0206
0207 NDRX_STRCPY_SAFE(call->reply_to, G_atmi_tls->G_atmi_conf.reply_q_str);
0208
0209
0210 if (ex_flags & TPCALL_BROADCAST)
0211 {
0212 call->command_id = ATMI_COMMAND_BROADCAST;
0213 }
0214 else
0215 {
0216 call->command_id = ATMI_COMMAND_TPNOTIFY;
0217 }
0218
0219 call->flags = flags;
0220 timestamp = time(NULL);
0221
0222
0223 if (flags & TPACK)
0224 {
0225 NDRX_LOG(log_warn, "TPACK set but not supported. Ignoring...");
0226 tpcall_cd = 0;
0227 }
0228 else
0229 {
0230 NDRX_LOG(log_debug, "TPACK not set, => cd=0 - no reply needed");
0231 tpcall_cd = 0;
0232 }
0233
0234 call->cd = tpcall_cd;
0235 call->timestamp = timestamp;
0236
0237 if (NULL!=usrname)
0238 {
0239 NDRX_STRCPY_SAFE(call->usrname, usrname);
0240 }
0241 else
0242 {
0243 call->usrname[0] = EXEOS;
0244 call->usrname_isnull = EXTRUE;
0245 }
0246
0247 if (NULL!=cltname)
0248 {
0249 NDRX_STRCPY_SAFE(call->cltname, cltname);
0250 }
0251 else
0252 {
0253 call->cltname[0] = EXEOS;
0254 call->cltname_isnull = EXTRUE;
0255 }
0256
0257 if (NULL!=nodeid)
0258 {
0259 NDRX_STRCPY_SAFE(call->nodeid, nodeid);
0260 }
0261 else
0262 {
0263 call->nodeid[0] = EXEOS;
0264 call->nodeid_isnull = EXTRUE;
0265 }
0266
0267
0268 ndrx_stopwatch_reset(&call->timer);
0269
0270 NDRX_STRCPY_SAFE(call->my_id, G_atmi_tls->G_atmi_conf.my_id);
0271
0272 NDRX_LOG(log_debug, "Sending notification request to: [%s] my_id=[%s] "
0273 "reply_to=[%s] cd=%d callseq=%u",
0274 send_q, call->my_id, call->reply_to, tpcall_cd, call->callseq);
0275
0276 NDRX_DUMP(log_dump, "Sending away...", (char *)call, data_len);
0277
0278 if (EXSUCCEED!=(ret=ndrx_generic_q_send(send_q, (char *)call, data_len, flags,
0279 NDRX_MSGPRIO_NOTIFY)))
0280 {
0281 int err;
0282
0283 if (ENOENT==ret)
0284 {
0285 err=TPENOENT;
0286 }
0287 else
0288 {
0289 CONV_ERROR_CODE(ret, err);
0290 }
0291 ndrx_TPset_error_fmt(err, "%s: Failed to send, os err: %s", __func__, strerror(ret));
0292 EXFAIL_OUT(ret);
0293 }
0294
0295 out:
0296
0297 if (NULL!=buf)
0298 {
0299 NDRX_SYSBUF_FREE(buf);
0300 }
0301
0302 NDRX_LOG(log_debug, "%s return %d", __func__, ret);
0303 return ret;
0304 }
0305
0306
0307
0308
0309
0310
0311 expublic void ndrx_process_notif(char *buf, ssize_t len)
0312 {
0313 int ret = EXSUCCEED;
0314 char *odata = NULL;
0315 long olen = 0;
0316 expublic buffer_obj_t * typed_buf = NULL;
0317 tp_notif_call_t *notif = (tp_notif_call_t *) buf;
0318
0319 NDRX_LOG(log_debug, "%s: Got notification from: [%s], data len: %ld: ",
0320 __func__, notif->my_id, notif->data_len);
0321
0322 if (NULL==G_atmi_tls->p_unsol_handler)
0323 {
0324 NDRX_LOG(log_warn, "Unsol handler not set - dropping message");
0325 goto out;
0326 }
0327
0328 if (G_atmi_tls->client_init_data.flags & TPU_IGN)
0329 {
0330 NDRX_LOG(log_warn, "TPU_IGN have been set - dropping message");
0331 goto out;
0332 }
0333
0334
0335 NDRX_LOG(log_debug, "%s: data received", __func__);
0336
0337 if (EXSUCCEED==(ndrx_mbuf_prepare_incoming(notif->data,
0338 notif->data_len,
0339 &odata,
0340 &olen,
0341 0L, 0L)))
0342 {
0343 typed_buf = ndrx_find_buffer(odata);
0344 }
0345 else
0346 {
0347 NDRX_LOG(log_error, "Failed to prepare incoming unsolicited notification");
0348 EXFAIL_OUT(ret);
0349 }
0350
0351 NDRX_LOG(log_debug, "Unsol handler set to %p - invoking (buffer: %p)",
0352 G_atmi_tls->p_unsol_handler, odata);
0353
0354
0355
0356
0357 G_atmi_tls->p_unsol_handler(odata, olen, 0);
0358
0359 out:
0360
0361 if (NULL!=typed_buf)
0362 {
0363 NDRX_LOG(log_debug, "About to free buffer %p", typed_buf->buf);
0364 tpfree(typed_buf->buf);
0365 }
0366
0367 return;
0368 }
0369
0370
0371
0372
0373
0374
0375
0376
0377 expublic int ndrx_tpchkunsol(long flags)
0378 {
0379 int ret = EXSUCCEED;
0380 char *pbuf = NULL;
0381 size_t pbuf_len;
0382 ssize_t rply_len;
0383 int num_applied = 0;
0384 unsigned prio;
0385 tp_notif_call_t *notif;
0386
0387
0388 NDRX_LOG(log_debug, "Into %s", __func__);
0389 do
0390 {
0391 if (NULL==pbuf)
0392 {
0393 NDRX_SYSBUF_MALLOC_OUT(pbuf, pbuf_len, ret);
0394 }
0395
0396
0397
0398 rply_len = ndrx_generic_q_receive(G_atmi_tls->G_atmi_conf.reply_q,
0399 G_atmi_tls->G_atmi_conf.reply_q_str,
0400 &(G_atmi_tls->G_atmi_conf.reply_q_attr),
0401 pbuf, pbuf_len, &prio, flags);
0402
0403 NDRX_LOG(log_debug, "%s: %zd", __func__, (long)rply_len);
0404
0405 if (rply_len<=0)
0406 {
0407 NDRX_LOG(log_warn, "%s: No message (%zd)", __func__, rply_len);
0408 goto out;
0409 }
0410
0411 notif=(tp_notif_call_t *) pbuf;
0412
0413
0414 NDRX_LOG(log_info, "%s: got message, len: %zd, command id: %d",
0415 __func__, rply_len, notif->command_id);
0416
0417 if (ATMI_COMMAND_TPNOTIFY == notif->command_id ||
0418 ATMI_COMMAND_BROADCAST == notif->command_id)
0419 {
0420 num_applied++;
0421 NDRX_LOG(log_info, "Got unsol command");
0422 ndrx_process_notif(pbuf, rply_len);
0423
0424
0425
0426
0427 }
0428 else
0429 {
0430 NDRX_LOG(log_info, "got non unsol command - enqueue");
0431
0432 if (EXSUCCEED!=ndrx_add_to_memq(&pbuf, pbuf_len, rply_len))
0433 {
0434 EXFAIL_OUT(ret);
0435 }
0436
0437 }
0438
0439
0440
0441
0442 if (num_applied && ! (flags & TPNOBLOCK) )
0443 {
0444 break;
0445 }
0446
0447 } while (1);
0448 out:
0449
0450 if (NULL!=pbuf)
0451 {
0452 NDRX_SYSBUF_FREE(pbuf);
0453 }
0454
0455 NDRX_LOG(log_debug, "%s returns %d (applied msgs: %d)", __func__,
0456 ret, num_applied);
0457
0458 if (EXSUCCEED==ret)
0459 {
0460 return num_applied;
0461 }
0462 else
0463 {
0464 return ret;
0465 }
0466 }
0467
0468
0469
0470
0471
0472
0473
0474
0475
0476 exprivate int match_nodeid(char *nodeid_str, char *nodeid,
0477 regex_t *regexp_nodeid, long flags)
0478 {
0479 int ret = EXFALSE;
0480
0481 if (NULL!=nodeid)
0482 {
0483 if (EXEOS==nodeid[0])
0484 {
0485 NDRX_LOG(log_info, "Nodeid %s (nodeid=EOS)", nodeid_str);
0486 ret = EXTRUE;
0487 }
0488 else if ((flags & TPREGEXMATCH ) &&
0489 EXSUCCEED==ndrx_regexec(regexp_nodeid, nodeid_str))
0490 {
0491 NDRX_LOG(log_info, "Nodeid %s matched local node by regexp",
0492 nodeid_str);
0493 ret = EXTRUE;
0494 }
0495 else if (0==strcmp(nodeid, nodeid_str))
0496 {
0497 NDRX_LOG(log_info, "Nodeid %s matched by nodeid str param",
0498 nodeid_str);
0499 ret = EXTRUE;
0500 }
0501 else
0502 {
0503 NDRX_LOG(log_info, "Nodeid %s did not match nodeid param [%s] => "
0504 "skip node for broadcast",
0505 nodeid_str, nodeid);
0506 }
0507 }
0508 else
0509 {
0510 NDRX_LOG(log_info, "nodeid param NULL, local node %s matched for broadcast",
0511 nodeid_str);
0512 ret = EXTRUE;
0513 }
0514
0515 return ret;
0516 }
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526
0527 expublic int ndrx_tpbroadcast_local(char *nodeid, char *usrname, char *cltname,
0528 char *data, long len, long flags, int dispatch_local)
0529 {
0530 int ret = EXSUCCEED;
0531 string_list_t* qlist = NULL;
0532 string_list_t* elt = NULL;
0533 int typ;
0534 TPMYID myid;
0535 ndrx_qdet_t qdet;
0536 CLIENTID cltid;
0537 regex_t regexp_nodeid;
0538 int regexp_nodeid_comp = EXFALSE;
0539
0540 regex_t regexp_usrname;
0541 int regexp_usrname_comp = EXFALSE;
0542
0543 regex_t regexp_cltname;
0544 int regexp_cltname_comp = EXFALSE;
0545 char nodeid_str[16];
0546
0547 int local_node_ok = EXFALSE;
0548 int cltname_ok;
0549
0550 long local_nodeid = tpgetnodeid();
0551
0552 char connected_nodes[CONF_NDRX_NODEID_COUNT+1] = {EXEOS};
0553
0554
0555 if (flags & TPREGEXMATCH)
0556 {
0557
0558
0559 if (NULL!=nodeid && EXEOS!=nodeid[0])
0560 {
0561 if (EXSUCCEED!=ndrx_regcomp(®exp_nodeid, nodeid))
0562 {
0563 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile nodeid=[%s] regexp",
0564 __func__, nodeid);
0565 NDRX_LOG(log_error, "Failed to compile nodeid=[%s]", nodeid);
0566 EXFAIL_OUT(ret);
0567 }
0568 else
0569 {
0570 regexp_nodeid_comp = EXTRUE;
0571 }
0572 }
0573
0574 if (NULL!=usrname && EXEOS!=usrname[0])
0575 {
0576 if (EXSUCCEED!=ndrx_regcomp(®exp_usrname, usrname))
0577 {
0578 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile usrname=[%s] regexp",
0579 __func__, nodeid);
0580 NDRX_LOG(log_error, "Failed to compile usrname=[%s]", usrname);
0581 EXFAIL_OUT(ret);
0582 }
0583 else
0584 {
0585 regexp_usrname_comp = EXTRUE;
0586 }
0587 }
0588
0589 if (NULL!=cltname && EXEOS!=cltname[0])
0590 {
0591 if (EXSUCCEED!=ndrx_regcomp(®exp_cltname, cltname))
0592 {
0593 ndrx_TPset_error_fmt(TPEINVAL, "Failed to compile cltname=[%s] regexp",
0594 __func__, cltname);
0595 NDRX_LOG(log_error, "Failed to compile cltname=[%s]", cltname);
0596 EXFAIL_OUT(ret);
0597 }
0598 else
0599 {
0600 regexp_cltname_comp = EXTRUE;
0601 }
0602 }
0603 }
0604
0605
0606
0607
0608
0609
0610
0611
0612 snprintf(nodeid_str, sizeof(nodeid_str), "%ld", local_nodeid);
0613
0614 local_node_ok=match_nodeid(nodeid_str, nodeid, ®exp_nodeid, flags);
0615
0616 if (local_node_ok)
0617 {
0618
0619 qlist = ndrx_sys_mqueue_list_make(G_atmi_env.qpath, &ret);
0620
0621 if (EXSUCCEED!=ret)
0622 {
0623 NDRX_LOG(log_error, "posix queue listing failed... continue...!");
0624 ret = EXSUCCEED;
0625 qlist = NULL;
0626 }
0627
0628 LL_FOREACH(qlist,elt)
0629 {
0630
0631 if (0!=strncmp(elt->qname,
0632 G_atmi_env.qprefix_match, G_atmi_env.qprefix_match_len))
0633 {
0634 continue;
0635 }
0636
0637
0638
0639
0640
0641
0642 typ = ndrx_q_type_get(elt->qname);
0643
0644 if (NDRX_QTYPE_CLTRPLY==typ)
0645 {
0646
0647
0648
0649 NDRX_LOG(log_debug, "Got client Q: [%s] - extract CLIENTID",
0650 elt->qname);
0651
0652
0653 if (EXSUCCEED!=ndrx_qdet_parse_cltqstr(&qdet, elt->qname))
0654 {
0655 NDRX_LOG(log_error, "Failed to parse Q details!");
0656 EXFAIL_OUT(ret);
0657 }
0658
0659
0660 cltname_ok = EXFALSE;
0661
0662 if (NULL!=cltname)
0663 {
0664 if (EXEOS==cltname[0])
0665 {
0666 NDRX_LOG(log_info, "Process matched broadcast [%s] "
0667 "(cltname=EOS)",
0668 elt->qname);
0669 cltname_ok = EXTRUE;
0670 }
0671 else if ((flags & TPREGEXMATCH )
0672 && EXSUCCEED==ndrx_regexec(®exp_cltname, qdet.binary_name))
0673 {
0674 NDRX_LOG(log_info, "Process [%s]/[%s] matched broadcast "
0675 "by regexp",
0676 elt->qname, qdet.binary_name);
0677 cltname_ok = EXTRUE;
0678 }
0679 else if (0==strcmp(cltname, qdet.binary_name))
0680 {
0681 NDRX_LOG(log_info, "Process [%s]/[%s] matched by "
0682 "cltname str param [%s]",
0683 elt->qname, qdet.binary_name, cltname);
0684 cltname_ok = EXTRUE;
0685 }
0686 else
0687 {
0688 NDRX_LOG(log_info, "Process [%s]/[%s] did not match "
0689 "cltname param [%s] => "
0690 "skip process for broadcast",
0691 elt->qname, qdet.binary_name, cltname);
0692 }
0693 }
0694 else
0695 {
0696 NDRX_LOG(log_info, "cltname param NULL, process [%s]/[%s] "
0697 "matched for broadcast",
0698 elt->qname, qdet.binary_name);
0699 cltname_ok = EXTRUE;
0700 }
0701
0702 if (cltname_ok)
0703 {
0704
0705 if (EXSUCCEED!=ndrx_myid_convert_from_qdet(&myid, &qdet, local_nodeid))
0706 {
0707 NDRX_LOG(log_error, "Failed to build MYID from QDET!");
0708 EXFAIL_OUT(ret);
0709 }
0710
0711
0712 ndrx_myid_to_my_id_str(&myid, cltid.clientdata);
0713
0714 NDRX_LOG(log_info, "Build client id string: [%s]",
0715 cltid.clientdata);
0716
0717
0718 if (EXSUCCEED!=ndrx_tpnotify(&cltid, &myid, elt->qname,
0719 data, len, flags, 0, nodeid, usrname, cltname, 0))
0720 {
0721 NDRX_LOG(log_debug, "Failed to notify [%s] with buffer len: %d",
0722 cltid.clientdata, len);
0723 userlog("Failed to notify [%s] with buffer len: %d",
0724 cltid.clientdata, len);
0725 }
0726 }
0727 }
0728 }
0729 }
0730
0731
0732
0733
0734
0735
0736
0737 if (!dispatch_local)
0738 {
0739 NDRX_LOG(log_info, "Dispatching over any connected nodes");
0740 if (EXSUCCEED==ndrx_shm_birdge_getnodesconnected(connected_nodes))
0741 {
0742 int i;
0743 int len = strlen(connected_nodes);
0744 for (i=0; i<len; i++)
0745 {
0746
0747 snprintf(nodeid_str, sizeof(nodeid_str), "%d",
0748 (int)connected_nodes[i]);
0749
0750 if (match_nodeid(nodeid_str, nodeid, ®exp_nodeid, flags))
0751 {
0752 NDRX_LOG(log_debug, "Node id %d accepted for broadcast",
0753 (int)connected_nodes[i]);
0754
0755 if (EXSUCCEED!=ndrx_tpnotify(NULL, NULL, NULL,
0756 data, len, flags,
0757 (long)connected_nodes[i], nodeid, usrname, cltname,
0758 (TPCALL_BRCALL | TPCALL_BROADCAST)))
0759 {
0760 NDRX_LOG(log_debug, "Failed to notify [%s] with buffer len: %d",
0761 cltid.clientdata, len);
0762 userlog("Failed to notify [%s] with buffer len: %d",
0763 cltid.clientdata, len);
0764 }
0765 }
0766 }
0767 }
0768 }
0769 else
0770 {
0771 NDRX_LOG(log_info, "Skip the cluster, local dispatch only...");
0772 }
0773
0774 out:
0775
0776 ndrx_string_list_free(qlist);
0777
0778 if (regexp_nodeid_comp)
0779 {
0780 ndrx_regfree(®exp_nodeid);
0781 }
0782
0783 if (regexp_usrname_comp)
0784 {
0785 ndrx_regfree(®exp_usrname);
0786 }
0787
0788 if (regexp_cltname_comp)
0789 {
0790 ndrx_regfree(®exp_cltname);
0791 }
0792
0793 NDRX_LOG(log_debug, "%s returns %d", __func__, ret);
0794
0795 return ret;
0796 }
0797
0798