0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 #include <ndrx_config.h>
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <errno.h>
0040 #include <sys/stat.h>
0041 #include <unistd.h>
0042 #include <fcntl.h>
0043
0044 #include <ndrstandard.h>
0045 #include <ndebug.h>
0046 #include <utlist.h>
0047 #include <string.h>
0048 #include "srv_int.h"
0049 #include "tperror.h"
0050 #include <atmi_int.h>
0051 #include <atmi_shm.h>
0052 #include <sys_unix.h>
0053 #include <ndrx_ddr.h>
0054
0055
0056 #define READVERTISE_SLEEP_SRV 2
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074 expublic int dynamic_readvertise(char *svcname)
0075 {
0076 int ret=EXSUCCEED;
0077 svc_entry_fn_t *entry=NULL;
0078 char *fn="dynamic_readvertise";
0079 int found = EXFALSE;
0080 int mod;
0081
0082 NDRX_LOG(log_warn, "%s: enter, svcname = [%s]", fn, svcname);
0083
0084 if ( (entry = (svc_entry_fn_t*)NDRX_MALLOC(sizeof(svc_entry_fn_t))) == NULL)
0085 {
0086 NDRX_LOG(log_error, "Failed to allocate %d bytes while parsing -s",
0087 sizeof(svc_entry_fn_t));
0088 ret=EXFAIL;
0089 goto out;
0090 }
0091
0092 memset(entry, 0, sizeof(*entry));
0093
0094
0095 if (EXSUCCEED!=dynamic_unadvertise(svcname, &found, entry) || !found)
0096 {
0097 NDRX_LOG(log_error, "Failed to unadvertise: [%s]", svcname);
0098 ret=EXFAIL;
0099 goto out;
0100 }
0101
0102
0103
0104
0105 mod = ndrx_rand() % 4;
0106
0107 NDRX_LOG(log_warn, "Sleeping %d seconds for re-advertise!",
0108 READVERTISE_SLEEP_SRV+mod);
0109
0110 sleep(READVERTISE_SLEEP_SRV+mod);
0111
0112
0113 entry->q_descr = (mqd_t)EXFAIL;
0114 if (EXSUCCEED!=dynamic_advertise(entry, svcname, entry->p_func, entry->fn_nm))
0115 {
0116 NDRX_LOG(log_error, "Failed to advertise: [%s]", svcname);
0117 ret=EXFAIL;
0118 goto out;
0119 }
0120
0121 out:
0122
0123 if (EXSUCCEED!=ret && NULL!=entry)
0124 NDRX_FREE(entry);
0125
0126 NDRX_LOG(log_warn, "%s: return, ret = %d", fn, ret);
0127 return ret;
0128 }
0129
0130
0131
0132
0133
0134
0135 expublic int dynamic_unadvertise(char *svcname, int *found, svc_entry_fn_t *copy)
0136 {
0137 int ret=EXSUCCEED;
0138 int pos;
0139 svc_entry_fn_t *ent=NULL;
0140 int service;
0141 int len;
0142
0143 char *thisfn="_dynamic_unadvertise";
0144
0145 for (pos=0; pos<G_server_conf.adv_service_count; pos++)
0146 {
0147 if (0==strcmp(svcname, G_server_conf.service_array[pos]->svc_nm))
0148 {
0149 ent = G_server_conf.service_array[pos];
0150 NDRX_LOG(log_warn, "Service [%s] found in array at %d", svcname, pos);
0151 break;
0152 }
0153 }
0154
0155 if (ent)
0156 {
0157
0158 if (NULL!=copy)
0159 {
0160 memcpy(copy, ent, sizeof(svc_entry_fn_t));
0161 }
0162
0163 if (NULL!=found)
0164 {
0165 *found = EXTRUE;
0166 }
0167
0168 NDRX_LOG(log_error, "Q File descriptor: %d - removing from polling struct",
0169 ent->q_descr);
0170
0171 if (EXFAIL==ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_DEL,
0172 ent->q_descr, NULL))
0173 {
0174 ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_ctl failed to remove fd %d from epollfd: %s",
0175 ent->q_descr, ndrx_poll_strerror(ndrx_epoll_errno()));
0176 ret=EXFAIL;
0177 goto out;
0178 }
0179
0180
0181 if (ndrx_epoll_shallopenq(pos) &&
0182 EXSUCCEED!=ndrx_mq_close(ent->q_descr))
0183 {
0184 ndrx_TPset_error_fmt(TPEOS, "ndrx_mq_close failed to close fd %d: %s",
0185 ent->q_descr, strerror(errno));
0186 ret=EXFAIL;
0187 goto out;
0188 }
0189
0190 len = G_server_conf.adv_service_count;
0191
0192 if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_server_conf.service_array), pos,
0193 len, sizeof(svc_entry_fn_t *)))
0194 {
0195 NDRX_LOG(log_error, "Failed to shift memory for "
0196 "G_server_conf.service_array!");
0197 ret=EXFAIL;
0198 goto out;
0199 }
0200
0201 G_server_conf.service_array=NDRX_REALLOC(G_server_conf.service_array,
0202 (sizeof(svc_entry_fn_t *)*len-1));
0203
0204 if (NULL==G_server_conf.service_array)
0205 {
0206 ndrx_TPset_error_fmt(TPEOS, "realloc failed: %s", strerror(errno));
0207 ret=EXFAIL;
0208 goto out;
0209 }
0210
0211
0212 NDRX_FREE(ent);
0213 ent=NULL;
0214
0215 service = pos - ATMI_SRV_Q_ADJUST;
0216 if (EXSUCCEED!=atmisrv_array_remove_element((void *)G_shm_srv->svc_fail, service,
0217 MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_fail))))
0218 {
0219 NDRX_LOG(log_error, "Failed to shift memory for G_shm_srv->svc_succeed!");
0220 ret=EXFAIL;
0221 goto out;
0222 }
0223
0224 if (EXSUCCEED!=unadvertse_to_ndrxd(svcname))
0225 {
0226 ret=EXFAIL;
0227 goto out;
0228 }
0229
0230 G_server_conf.adv_service_count--;
0231
0232 if (G_shm_srv)
0233 {
0234
0235 if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->svc_succeed), service,
0236 MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_succeed))))
0237 {
0238 NDRX_LOG(log_error, "Failed to shift memory for G_shm_srv->svc_succeed!");
0239 ret=EXFAIL;
0240 goto out;
0241 }
0242
0243 if (EXSUCCEED!=atmisrv_array_remove_element((void *)&(G_shm_srv->min_rsp_msec),
0244 service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->min_rsp_msec))))
0245 {
0246 NDRX_LOG(log_error, "Failed to shift memory for "
0247 "G_shm_srv->min_rsp_msec!");
0248 ret=EXFAIL;
0249 goto out;
0250 }
0251
0252 if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->max_rsp_msec),
0253 service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->max_rsp_msec))))
0254 {
0255 NDRX_LOG(log_error, "Failed to shift memory for "
0256 "G_shm_srv->max_rsp_msec!");
0257 ret=EXFAIL;
0258 goto out;
0259 }
0260
0261 if (EXSUCCEED!=atmisrv_array_remove_element((void *)(G_shm_srv->last_rsp_msec),
0262 service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->last_rsp_msec))))
0263 {
0264 NDRX_LOG(log_error, "Failed to shift memory for 1"
0265 "G_shm_srv->last_rsp_msec!");
0266 ret=EXFAIL;
0267 goto out;
0268 }
0269
0270 if (EXSUCCEED!=atmisrv_array_remove_element((void *)&(G_shm_srv->svc_status),
0271 service, MAX_SVC_PER_SVR, sizeof(*(G_shm_srv->svc_status))))
0272 {
0273 NDRX_LOG(log_error, "Failed to shift memory for "
0274 "G_shm_srv->svc_status!");
0275 ret=EXFAIL;
0276 goto out;
0277 }
0278 }
0279 }
0280 else
0281 {
0282 ndrx_TPset_error_fmt(TPENOENT, "%s: service [%s] not advertised", thisfn, svcname);
0283 ret=EXFAIL;
0284 goto out;
0285 }
0286
0287 out:
0288 return ret;
0289 }
0290
0291
0292
0293
0294
0295
0296 expublic int dynamic_advertise(svc_entry_fn_t *entry_new,
0297 char *svc_nm, void (*p_func)(TPSVCINFO *), char *fn_nm)
0298 {
0299 int ret=EXSUCCEED;
0300 int pos, service;
0301 svc_entry_fn_t *entry_chk=NULL;
0302 struct ndrx_epoll_event ev;
0303 int sz;
0304
0305 int autotran=0;
0306 unsigned long trantime=NDRX_DDR_TRANTIMEDFLT;
0307
0308 for (pos=0; pos<G_server_conf.adv_service_count; pos++)
0309 {
0310 if (0==strcmp(svc_nm, G_server_conf.service_array[pos]->svc_nm))
0311 {
0312 entry_chk = G_server_conf.service_array[pos];
0313 break;
0314 }
0315 }
0316
0317
0318 if (NULL!=entry_chk)
0319 {
0320 NDRX_LOG(log_info, "Service [%s] found in array at %d",
0321 svc_nm, pos);
0322
0323 if (entry_chk->p_func == p_func)
0324 {
0325 NDRX_LOG(log_info, "Advertised function ptr "
0326 "the same - return OK!");
0327 goto out;
0328 }
0329 else
0330 {
0331 ndrx_TPset_error_fmt(TPEMATCH, "Service [%s] already advertised by func. "
0332 "ptr. %p, but now requesting advertise by func. ptr. %p!",
0333 svc_nm, entry_chk->p_func, p_func);
0334 ret=EXFAIL;
0335 goto out;
0336 }
0337 }
0338
0339
0340 if (G_server_conf.adv_service_count+1>MAX_SVC_PER_SVR)
0341 {
0342 ndrx_TPset_error_fmt(TPELIMIT, "Service limit per process %d reached!",
0343 MAX_SVC_PER_SVR-ATMI_SRV_Q_ADJUST);
0344 EXFAIL_OUT(ret);
0345 }
0346
0347
0348
0349
0350 service = G_server_conf.adv_service_count - ATMI_SRV_Q_ADJUST;
0351
0352 #ifdef EX_USE_POLL
0353 snprintf(entry_new->listen_q, sizeof(entry_new->listen_q), NDRX_SVC_QFMT_SRVID,
0354 G_server_conf.q_prefix, entry_new->svc_nm, (short)G_server_conf.srv_id);
0355 #else
0356 snprintf(entry_new->listen_q, sizeof(entry_new->listen_q), NDRX_SVC_QFMT,
0357 G_server_conf.q_prefix, entry_new->svc_nm);
0358 #endif
0359
0360
0361 if (EXTRUE==(ret=ndrx_ddr_service_get(svc_nm, &autotran, &trantime)))
0362 {
0363 NDRX_LOG(log_debug, "Service [%s] found in <services> section autotran: %d trantime: %lu",
0364 svc_nm, autotran, trantime);
0365 entry_new->autotran = autotran;
0366 entry_new->trantime = trantime;
0367 ret=EXSUCCEED;
0368 }
0369
0370 if (EXFAIL==ret)
0371 {
0372
0373 EXFAIL_OUT(ret);
0374 }
0375
0376
0377
0378 NDRX_LOG(log_debug, "About to listen on: %s", entry_new->listen_q);
0379
0380
0381
0382 if (G_shm_srv && EXSUCCEED!=ndrx_lock_svc_op(__func__))
0383 {
0384 NDRX_LOG(log_error, "Failed to lock sempahore");
0385 ret=EXFAIL;
0386 goto out;
0387 }
0388
0389
0390
0391
0392 if (ndrx_epoll_shallopenq(ATMI_SRV_Q_ADJUST+G_server_conf.adv_service_count))
0393 {
0394 entry_new->q_descr = ndrx_mq_open_at (entry_new->listen_q,
0395 O_RDWR | O_CREAT | O_NONBLOCK, S_IWUSR | S_IRUSR, NULL);
0396
0397
0398
0399
0400 if ((mqd_t)EXFAIL==entry_new->q_descr)
0401 {
0402
0403 if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0404
0405 ndrx_TPset_error_fmt(TPEOS, "Failed to open queue: %s: %s",
0406 entry_new->listen_q, strerror(errno));
0407 EXFAIL_OUT(ret);
0408 }
0409 }
0410 else
0411 {
0412
0413
0414
0415
0416 entry_new->q_descr = (mqd_t)EXFAIL;
0417 }
0418
0419
0420 entry_new->q_descr=ndrx_epoll_service_add(entry_new->svc_nm,
0421 G_server_conf.adv_service_count, entry_new->q_descr);
0422
0423 if ((mqd_t)EXFAIL==entry_new->q_descr)
0424 {
0425
0426 if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0427
0428 ndrx_TPset_error_fmt(TPEOS, "Failed to register poller "
0429 "svc at idx %d: %s: %s",
0430 G_server_conf.adv_service_count, entry_new->listen_q, strerror(errno));
0431 EXFAIL_OUT(ret);
0432 }
0433
0434
0435 if (G_shm_srv)
0436 {
0437 #ifdef EX_USE_SYSVQ
0438 ret=ndrx_shm_install_svc(entry_new->svc_nm, 0, ndrx_epoll_resid_get());
0439 #else
0440 ret=ndrx_shm_install_svc(entry_new->svc_nm, 0, G_server_conf.srv_id);
0441 #endif
0442 }
0443
0444
0445 if (G_shm_srv) ndrx_unlock_svc_op(__func__);
0446
0447
0448
0449 if (EXSUCCEED!=ret)
0450 {
0451 ndrx_TPset_error_fmt(TPELIMIT, "Service shared memory is full. "
0452 "Try to increase NDRX_SVCMAX");
0453 EXFAIL_OUT(ret);
0454 }
0455
0456
0457 ndrx_stopwatch_reset(&entry_new->qopen_time);
0458
0459 NDRX_LOG(log_debug, "Got file descriptor: %d, adding to e-poll!",
0460 entry_new->q_descr);
0461
0462
0463 sz = sizeof(*G_server_conf.service_array)*(G_server_conf.adv_service_count+1);
0464
0465 G_server_conf.service_array = NDRX_REALLOC(G_server_conf.service_array, sz);
0466
0467 if (NULL==G_server_conf.service_array)
0468 {
0469 ndrx_TPset_error_fmt(TPEOS, "Failed to reallocate memory to %d bytes!", sz);
0470 ret=EXFAIL;
0471 goto out;
0472 }
0473
0474
0475 G_server_conf.service_array[G_server_conf.adv_service_count] = entry_new;
0476
0477 G_server_conf.adv_service_count++;
0478
0479 memset(&ev, 0, sizeof(ev));
0480 ev.events = EX_EPOLL_FLAGS;
0481 #ifdef EX_USE_EPOLL
0482 ev.data.fd = entry_new->q_descr;
0483 #else
0484 ev.data.mqd = entry_new->q_descr;
0485 #endif
0486
0487 if (EXFAIL==ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_ADD,
0488 entry_new->q_descr, &ev))
0489 {
0490 G_server_conf.adv_service_count--;
0491 ndrx_TPset_error_fmt(TPEOS, "ndrx_epoll_ctl failed: %s",
0492 ndrx_poll_strerror(ndrx_epoll_errno()));
0493 ret=EXFAIL;
0494 goto out;
0495 }
0496
0497
0498 G_shm_srv->svc_status[service] = NDRXD_SVC_STATUS_AVAIL;
0499
0500
0501 if (EXSUCCEED!=advertse_to_ndrxd(entry_new))
0502 {
0503 NDRX_LOG(log_error, "Failed to send advertise message to NDRXD!");
0504 ret=EXFAIL;
0505 goto out;
0506 }
0507
0508 out:
0509
0510 if (EXSUCCEED!=ret && (mqd_t)EXFAIL!=entry_new->q_descr)
0511 {
0512
0513
0514 ndrx_epoll_ctl_mq(G_server_conf.epollfd, EX_EPOLL_CTL_DEL,
0515 entry_new->q_descr, NULL);
0516
0517
0518 if (ndrx_epoll_shallopenq(ATMI_SRV_Q_ADJUST+G_server_conf.adv_service_count))
0519 {
0520 ndrx_mq_close(entry_new->q_descr);
0521 }
0522
0523 }
0524 return ret;
0525 }
0526