0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045 #include <ndrx_config.h>
0046 #include <sys/socket.h>
0047 #include <sys/time.h>
0048 #include <sys/types.h>
0049 #include <arpa/inet.h>
0050 #include <netinet/in.h>
0051 #include <netinet/tcp.h>
0052 #include <errno.h>
0053 #include <fcntl.h>
0054 #include <netdb.h>
0055 #include <string.h>
0056 #include <unistd.h>
0057
0058 #include <ndrstandard.h>
0059
0060 #if defined(EX_USE_EPOLL)
0061
0062 #include <sys/epoll.h>
0063
0064 #elif defined(EX_USE_KQUEUE)
0065
0066 #include <sys/event.h>
0067
0068 #endif
0069
0070 #include <poll.h>
0071
0072 #include <atmi.h>
0073 #include <stdio.h>
0074 #include <stdlib.h>
0075 #include <exnet.h>
0076 #include <ndebug.h>
0077 #include <utlist.h>
0078
0079 #include <atmi_int.h>
0080 #include <ndrx_config.h>
0081
0082
0083
0084 #define DBUF_SZ (NDRX_MSGSIZEMAX - net->dl)
0085
0086 #if defined(EX_USE_EPOLL)
0087
0088 #define POLL_FLAGS (EPOLLIN | EPOLLHUP)
0089
0090 #elif defined(EX_USE_KQUEUE)
0091
0092 #define POLL_FLAGS EVFILT_READ
0093
0094 #else
0095
0096 #define POLL_FLAGS (POLLIN)
0097
0098 #endif
0099
0100
0101
0102
0103
0104
0105 exprivate MUTEX_LOCKDECL(M_send_lock);
0106 exprivate MUTEX_LOCKDECL(M_recv_lock);
0107
0108
0109 exprivate int close_socket(exnetcon_t *net);
0110 exprivate int open_socket(exnetcon_t *net);
0111 exprivate ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n,
0112 int flags, int appflags);
0113
0114 exprivate int exnet_schedule_run(exnetcon_t *net);
0115
0116
0117
0118
0119
0120
0121
0122
0123 expublic long exnet_stopwatch_get_delta_sec(exnetcon_t *net, ndrx_stopwatch_t *w)
0124 {
0125 long ret;
0126
0127 MUTEX_LOCK_V(net->flagslock);
0128
0129 ret = ndrx_stopwatch_get_delta_sec(w);
0130
0131 MUTEX_UNLOCK_V(net->flagslock);
0132
0133 return ret;
0134 }
0135
0136
0137
0138
0139
0140
0141 expublic void exnet_stopwatch_reset(exnetcon_t *net, ndrx_stopwatch_t *w)
0142 {
0143 MUTEX_LOCK_V(net->flagslock);
0144
0145 ndrx_stopwatch_reset(w);
0146
0147 MUTEX_UNLOCK_V(net->flagslock);
0148 }
0149
0150
0151
0152
0153
0154
0155
0156 expublic int exnet_send_sync(exnetcon_t *net, char *hdr_buf, int hdr_len,
0157 char *buf, int len, int flags, int appflags)
0158 {
0159 int ret=EXSUCCEED;
0160 int allow_size = DATA_BUF_MAX;
0161 int sent = 0;
0162
0163 char d[NET_LEN_PFX_LEN+128];
0164 int size_to_send, len_whdr, hdr_snd_size = 0;
0165 int tmp_s;
0166 int err;
0167 int retry;
0168 ndrx_stopwatch_t w;
0169
0170
0171 len_whdr = hdr_len + len;
0172 if (len_whdr>allow_size)
0173 {
0174 NDRX_LOG(log_error, "Buffer too large for sending! "
0175 "requested: %d, allowed: %d", len_whdr, allow_size);
0176 EXFAIL_OUT(ret);
0177 }
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187 if (4==net->len_pfx)
0188 {
0189
0190 d[0] = (len_whdr >> 24) & 0xff;
0191 d[1] = (len_whdr >> 16) & 0xff;
0192 d[2] = (len_whdr >> 8) & 0xff;
0193 d[3] = (len_whdr) & 0xff;
0194 hdr_snd_size=net->len_pfx;
0195 }
0196
0197 if (NULL!=hdr_buf)
0198 {
0199
0200 memcpy(d+net->len_pfx, hdr_buf, hdr_len);
0201 hdr_snd_size+=hdr_len;
0202 }
0203
0204 size_to_send = len+hdr_snd_size;
0205
0206
0207 MUTEX_LOCK_V(net->sendlock);
0208 do
0209 {
0210 err = 0;
0211 NDRX_LOG(log_debug, "Sending, len: %d, total msg: %d",
0212 size_to_send-sent, size_to_send);
0213
0214 if (!(appflags & APPFLAGS_MASK))
0215 {
0216 if (sent < hdr_snd_size)
0217 {
0218 NDRX_DUMP(log_debug, "Sending, msg (msg len pfx)",
0219 d+sent, hdr_snd_size-sent);
0220 }
0221 else
0222 {
0223 NDRX_DUMP(log_debug, "Sending, msg ", buf+sent-hdr_snd_size,
0224 size_to_send-sent);
0225 }
0226 }
0227 else
0228 {
0229 NDRX_LOG(log_debug, "*** MSG DUMP IS MASKED ***");
0230 }
0231
0232 ndrx_stopwatch_reset(&w);
0233
0234 do
0235 {
0236 err = 0;
0237 retry = EXFALSE;
0238
0239 if (sent<hdr_snd_size)
0240 {
0241 tmp_s = send(net->sock, d+sent, hdr_snd_size-sent, flags);
0242 }
0243 else
0244 {
0245
0246 tmp_s = send(net->sock, buf+sent-hdr_snd_size,
0247 size_to_send-sent, flags);
0248 }
0249
0250 if (EXFAIL==tmp_s)
0251 {
0252 err = errno;
0253 }
0254 else
0255 {
0256
0257 exnet_stopwatch_reset(net, &net->last_snd);
0258 }
0259
0260 if (EAGAIN==err || EWOULDBLOCK==err)
0261 {
0262 int spent = ndrx_stopwatch_get_delta_sec(&w);
0263 int rcvtim = net->rcvtimeout - spent;
0264 struct pollfd ufd;
0265
0266 memset(&ufd, 0, sizeof ufd);
0267
0268 NDRX_LOG(log_warn, "Socket full: %s - retry, "
0269 "time spent: %d, max: %d - POLLOUT (rcvtim=%d) sent: %d tot: %d",
0270 strerror(err), spent, net->rcvtimeout, rcvtim, sent, size_to_send);
0271
0272 ufd.fd = net->sock;
0273 ufd.events = POLLOUT;
0274
0275 if (rcvtim < 1 || poll(&ufd, 1, rcvtim * 1000) < 0 || ufd.revents & POLLERR)
0276 {
0277 NDRX_LOG(log_error, "ERROR! Failed to send, socket full: %s "
0278 "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)",
0279 strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0280 (ufd.revents & POLLERR));
0281
0282 userlog("ERROR! Failed to send, socket full: %s "
0283 "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)",
0284 strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0285 (ufd.revents & POLLERR));
0286
0287 net->schedule_close = EXTRUE;
0288 ret=EXFAIL;
0289 goto out_unlock;
0290 }
0291
0292 retry = EXTRUE;
0293
0294 }
0295 }
0296 while (retry);
0297
0298 if (EXFAIL==tmp_s)
0299 {
0300 NDRX_LOG(log_error, "send failure: %s",
0301 strerror(err));
0302
0303
0304 NDRX_LOG(log_error, "Scheduling connection close...");
0305 net->schedule_close = EXTRUE;
0306 ret=EXFAIL;
0307 break;
0308 }
0309 else
0310 {
0311 NDRX_LOG(log_debug, "Sent %d bytes", tmp_s);
0312
0313 sent+=tmp_s;
0314 }
0315
0316 if (sent < size_to_send)
0317 {
0318 NDRX_LOG(log_debug, "partial submission: total: %d, sent: %d, left "
0319 "for sending: %d - continue", size_to_send, sent,
0320 size_to_send - sent);
0321 }
0322
0323 }
0324 while (EXSUCCEED==ret && sent < size_to_send);
0325
0326 out_unlock:
0327 MUTEX_UNLOCK_V(net->sendlock);
0328
0329 out:
0330
0331 return ret;
0332 }
0333
0334
0335
0336
0337
0338
0339 exprivate ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n, int flags, int appflags)
0340 {
0341 ssize_t ret;
0342
0343 ret = recv (net->sock, __buf, __n, flags);
0344
0345 if (0==ret)
0346 {
0347 NDRX_LOG(log_error, "Disconnect received - schedule close!");
0348
0349 net->schedule_close = EXTRUE;
0350 ret=EXFAIL;
0351 goto out;
0352 }
0353 else if (EXFAIL==ret)
0354 {
0355 if (EAGAIN==errno || EWOULDBLOCK==errno)
0356 {
0357 NDRX_LOG(log_info, "Still no data (waiting...)");
0358 }
0359 else
0360 {
0361 NDRX_LOG(log_error, "recv failure: %s - schedule close",
0362 strerror(errno));
0363
0364 net->schedule_close = EXTRUE;
0365 }
0366
0367 ret=EXFAIL;
0368 goto out;
0369 }
0370 else
0371 {
0372
0373 exnet_stopwatch_reset(net, &net->last_rcv);
0374 }
0375
0376 out:
0377 return ret;
0378 }
0379
0380
0381
0382
0383
0384 exprivate int get_full_len(exnetcon_t *net)
0385 {
0386 int pfx_len, msg_len;
0387
0388
0389 pfx_len = (
0390 (net->d[0] & 0xff) << 24
0391 | (net->d[1] & 0xff) << 16
0392 | (net->d[2] & 0xff) << 8
0393 | (net->d[3] & 0xff)
0394 );
0395
0396 msg_len = pfx_len+net->len_pfx;
0397 NDRX_LOG(log_debug, "pfx_len=%d msg_len=%d", pfx_len, msg_len);
0398
0399
0400
0401 return msg_len;
0402 }
0403
0404
0405
0406
0407
0408
0409
0410
0411
0412
0413
0414
0415 expublic int exnet_recv_sync(exnetcon_t *net, char **buf, int *len, int flags, int appflags)
0416 {
0417 int ret=EXSUCCEED;
0418 int got_len;
0419 int full_msg;
0420 int download_size;
0421
0422
0423 MUTEX_LOCK_V(net->rcvlock);
0424
0425 if (0==net->dl)
0426 {
0427
0428 ndrx_stopwatch_reset(&net->rcv_timer);
0429 }
0430
0431 if (NULL==net->dlsysbuf)
0432 {
0433 size_t tmp_buf_len;
0434
0435 NDRX_SYSBUF_MALLOC_OUT(net->dlsysbuf, tmp_buf_len, ret);
0436 }
0437
0438 while (EXSUCCEED==ret)
0439 {
0440
0441 if (net->dl>=net->len_pfx)
0442 {
0443 full_msg = get_full_len(net);
0444
0445 if (full_msg < 0)
0446 {
0447 NDRX_LOG(log_error, "ERROR ! received len %d < 0 - closing socket!",
0448 full_msg);
0449 userlog("ERROR ! received len %d < 0! - schedule close socket!", full_msg);
0450 net->schedule_close = EXTRUE;
0451 ret=EXFAIL;
0452 break;
0453 }
0454 else if (full_msg > DATA_BUF_MAX)
0455 {
0456 NDRX_LOG(log_error, "ERROR ! received len %d > max buf %ld! - "
0457 "closing socket!", full_msg, DATA_BUF_MAX);
0458 userlog("ERROR ! received len %d > max buf %ld! - "
0459 "closing socket!", full_msg, DATA_BUF_MAX);
0460 net->schedule_close = EXTRUE;
0461 ret=EXFAIL;
0462 break;
0463 }
0464
0465 NDRX_LOG(log_debug, "Data buffered - "
0466 "buf: [%d], full: %d",
0467 net->dl, full_msg);
0468 if (net->dl >= full_msg)
0469 {
0470
0471
0472
0473
0474
0475
0476 *len = full_msg-net->len_pfx;
0477 net->dl -= full_msg;
0478 *buf=net->dlsysbuf;
0479 net->dlsysbuf=NULL;
0480
0481
0482 if (0!=net->dl)
0483 {
0484 NDRX_LOG(log_error, "Expected left over 0 but got %d", net->dl);
0485 userlog("Expected left over 0 but got %d", net->dl);
0486 abort();
0487 }
0488
0489 if (0==*len)
0490 {
0491 NDRX_LOG(log_debug, "zero length message - ignore!");
0492 ret=EXFAIL;
0493 }
0494 else if (!(appflags & APPFLAGS_MASK))
0495 {
0496 NDRX_DUMP(log_debug, "Got message: ", *buf, *len);
0497 }
0498
0499 MUTEX_UNLOCK_V(net->rcvlock);
0500 return ret;
0501 }
0502 }
0503
0504 NDRX_LOG(log_debug, "Data needs to be received, dl=%d", net->dl);
0505
0506
0507
0508
0509
0510
0511 if (net->dl < net->len_pfx)
0512 {
0513
0514 download_size = net->len_pfx - net->dl;
0515 }
0516 else
0517 {
0518
0519 download_size = full_msg - net->dl;
0520 }
0521
0522
0523
0524
0525
0526
0527 if (download_size < 0)
0528 {
0529 NDRX_LOG(log_error, "ERROR ! Expected download size < 0 (%d)", download_size);
0530 userlog("ERROR ! Expected download size < 0 (%d)", download_size);
0531 net->schedule_close = EXTRUE;
0532 ret=EXFAIL;
0533 break;
0534 }
0535 else if (download_size > DBUF_SZ)
0536 {
0537 NDRX_LOG(log_error, "ERROR ! Expected download size bigger "
0538 "than buffer left: %d > %d", download_size, (DBUF_SZ));
0539 userlog("ERROR ! Expected download size bigger "
0540 "than buffer left: %d > %d", download_size, (DBUF_SZ));
0541
0542
0543
0544
0545
0546 net->schedule_close = EXTRUE;
0547 ret=EXFAIL;
0548 break;
0549 }
0550
0551
0552
0553
0554
0555 if (net->dl < net->len_pfx)
0556 {
0557 if (EXFAIL==(got_len=recv_wrap(net, net->d+net->dl, download_size,
0558 flags, appflags)))
0559 {
0560
0561 ret=EXFAIL;
0562 }
0563 else
0564 {
0565 if (!(appflags&APPFLAGS_MASK))
0566 {
0567 NDRX_DUMP(log_debug, "Got packet: ",
0568 net->d+net->dl, got_len);
0569 }
0570 net->dl+=got_len;
0571 }
0572 }
0573 else
0574 {
0575 if (EXFAIL==(got_len=recv_wrap(net, net->dlsysbuf+net->dl-net->len_pfx,
0576 download_size, flags, appflags)))
0577 {
0578
0579 ret=EXFAIL;
0580 }
0581 else
0582 {
0583 if (!(appflags&APPFLAGS_MASK))
0584 {
0585 NDRX_DUMP(log_debug, "Got packet: ",
0586 net->dlsysbuf+net->dl-net->len_pfx, got_len);
0587 }
0588 net->dl+=got_len;
0589 }
0590 }
0591
0592 }
0593
0594
0595
0596
0597
0598 if (!net->schedule_close &&
0599 ndrx_stopwatch_get_delta_sec(&net->rcv_timer) >=net->rcvtimeout )
0600 {
0601 NDRX_LOG(log_error, "This is time-out => schedule close socket !");
0602 net->schedule_close = EXTRUE;
0603 }
0604
0605 out:
0606 MUTEX_UNLOCK_V(net->rcvlock);
0607
0608
0609 ret=EXFAIL;
0610
0611 return ret;
0612 }
0613
0614
0615
0616
0617
0618 expublic int exnet_b4_poll_cb(void)
0619 {
0620 int ret=EXSUCCEED;
0621 exnetcon_t *head = extnet_get_con_head();
0622 exnetcon_t *net, *tmp;
0623
0624 DL_FOREACH_SAFE(head, net, tmp)
0625 {
0626 if (exnet_schedule_run(net))
0627 {
0628 continue;
0629 }
0630 }
0631
0632 out:
0633 return ret;
0634 }
0635
0636
0637
0638
0639 expublic int exnet_poll_cb(int fd, uint32_t events, void *ptr1)
0640 {
0641 int ret;
0642 int so_error=0;
0643 socklen_t len = sizeof so_error;
0644 exnetcon_t *net = (exnetcon_t *)ptr1;
0645
0646 int buflen = 0;
0647 char *buf = NULL;
0648
0649
0650 if (exnet_schedule_run(net))
0651 {
0652 goto out;
0653 }
0654
0655
0656
0657
0658
0659
0660 if (EXSUCCEED!=getsockopt(net->sock, SOL_SOCKET, SO_ERROR, &so_error, &len))
0661 {
0662 NDRX_LOG(log_error, "Failed go get getsockopt: %s",
0663 strerror(errno));
0664 ret=EXFAIL;
0665 goto out;
0666 }
0667
0668 if (0==so_error && !net->is_connected
0669
0670 #ifndef EX_USE_KQUEUE
0671 && events
0672 #endif
0673 )
0674 {
0675
0676
0677
0678 exnet_rwlock_mainth_write(net);
0679 EXNET_CONNECTED(net);
0680 exnet_rwlock_mainth_read(net);
0681
0682 NDRX_LOG(log_warn, "Connection is now open!");
0683
0684
0685 if (NULL!=net->p_connected && EXSUCCEED!=net->p_connected(net))
0686 {
0687 NDRX_LOG(log_error, "Connected notification "
0688 "callback failed!");
0689 ret=EXFAIL;
0690 goto out;
0691 }
0692
0693 }
0694
0695 if (0==so_error && !net->is_connected &&
0696 ndrx_stopwatch_get_delta_sec(&net->connect_time) > net->rcvtimeout)
0697 {
0698 NDRX_LOG(log_error, "Cannot establish connection to server in "
0699 "time: %ld secs", ndrx_stopwatch_get_delta_sec(&net->connect_time));
0700
0701 net->schedule_close = EXTRUE;
0702
0703 goto out;
0704 }
0705 else if (0!=so_error)
0706 {
0707 if (!net->is_connected)
0708 {
0709 NDRX_LOG(log_error, "Failed to connect to server: %s",
0710 strerror(so_error));
0711 }
0712 else
0713 {
0714 NDRX_LOG(log_error, "Socket client failed: %s",
0715 strerror(so_error));
0716 }
0717
0718 if (EINPROGRESS!=errno)
0719 {
0720 net->schedule_close = EXTRUE;
0721
0722
0723 goto out;
0724 }
0725 }
0726 else if (net->is_connected)
0727 {
0728 long rcvt;
0729
0730
0731
0732
0733
0734
0735
0736
0737
0738
0739
0740
0741
0742 if (net->periodic_zero &&
0743 exnet_stopwatch_get_delta_sec(net, &net->last_snd) > net->periodic_zero)
0744 {
0745 NDRX_LOG(log_info, "About to issue zero length "
0746 "message on fd %d", net->sock);
0747
0748
0749
0750
0751 net->p_snd_zero_len(net);
0752 }
0753
0754 if (net->p_snd_clock_sync && net->periodic_clock_time &&
0755 ndrx_stopwatch_get_delta_sec(&net->periodic_stopwatch) > net->periodic_clock_time)
0756 {
0757 NDRX_LOG(log_info, "About to issue clock sync "
0758 "message on fd %d", net->sock);
0759 net->p_snd_clock_sync(net);
0760
0761
0762 ndrx_stopwatch_reset(&net->periodic_stopwatch);
0763 }
0764
0765 if (net->recv_activity_timeout &&
0766 (rcvt=exnet_stopwatch_get_delta_sec(net, &net->last_rcv)) > net->recv_activity_timeout)
0767 {
0768 NDRX_LOG(log_error, "No data received in %ld sec (max with out data: %d) "
0769 "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0770 userlog("No data received in %ld sec (max with out data: %d) "
0771 "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0772 net->schedule_close = EXTRUE;
0773 }
0774 }
0775
0776
0777 #if defined(EX_USE_EPOLL)
0778
0779 if (events & EPOLLIN)
0780
0781 #elif defined(EX_USE_KQUEUE)
0782
0783 if (1)
0784
0785 #else
0786 if (events & POLLIN)
0787 #endif
0788 {
0789
0790
0791 if(EXSUCCEED == exnet_recv_sync(net, &buf, &buflen, 0, 0))
0792 {
0793
0794 net->p_process_msg(net, &buf, buflen);
0795 }
0796 }
0797
0798 out:
0799
0800
0801 if (NULL!=buf)
0802 {
0803 NDRX_SYSBUF_FREE(buf);
0804 }
0805
0806 return EXSUCCEED;
0807 }
0808
0809
0810
0811
0812
0813 exprivate int close_socket(exnetcon_t *net)
0814 {
0815 int ret=EXSUCCEED;
0816
0817 NDRX_LOG(log_warn, "Closing socket %d...", net->sock);
0818 net->dl = 0;
0819
0820 net->is_connected=EXFALSE;
0821
0822 if (EXFAIL!=net->sock)
0823 {
0824
0825 if (EXSUCCEED!=tpext_delpollerfd(net->sock))
0826 {
0827 NDRX_LOG(log_error, "Failed to remove polling extension: %s",
0828 tpstrerror(tperrno));
0829 }
0830
0831
0832 if (EXSUCCEED!=shutdown(net->sock, SHUT_RDWR))
0833 {
0834 NDRX_LOG(log_error, "Failed to shutdown socket: %s",
0835 strerror(errno));
0836 }
0837
0838 if (EXSUCCEED!=close(net->sock))
0839 {
0840 NDRX_LOG(log_error, "Failed to close socket: %s",
0841 strerror(errno));
0842 ret=EXFAIL;
0843 goto out;
0844 }
0845 }
0846
0847 out:
0848 net->sock=EXFAIL;
0849 net->schedule_close = EXFALSE;
0850
0851 if (NULL!=net->p_disconnected && EXSUCCEED!=net->p_disconnected(net))
0852 {
0853 NDRX_LOG(log_error, "Disconnected notification "
0854 "callback failed!");
0855 ret=EXFAIL;
0856 }
0857 #if 0
0858
0859 if (net->is_incoming)
0860 {
0861 exnet_del_con(net);
0862
0863 exnet_remove_incoming(net);
0864 }
0865 #endif
0866
0867 if (net->is_incoming)
0868 {
0869 exnet_remove_incoming(net);
0870 }
0871
0872 return ret;
0873 }
0874
0875
0876
0877
0878
0879
0880 expublic int exnet_configure_setopts(exnetcon_t *net)
0881 {
0882 int ret=EXSUCCEED;
0883 struct timeval tv;
0884 int flag = 1;
0885 int result;
0886 struct linger l;
0887 int enable = EXTRUE;
0888
0889
0890 if (EXFAIL==fcntl(net->sock, F_SETFL, O_NONBLOCK))
0891 {
0892 NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0893 strerror(errno));
0894 EXFAIL_OUT(ret);
0895 }
0896
0897 if (setsockopt(net->sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
0898 {
0899 NDRX_LOG(log_error, "Failed to set SO_REUSEADDR: %s", strerror(errno));
0900 EXFAIL_OUT(ret);
0901 }
0902
0903
0904 if (EXFAIL==(result = setsockopt(net->sock,
0905 IPPROTO_TCP,
0906 TCP_NODELAY,
0907 (char *) &flag,
0908
0909 sizeof(int))))
0910 {
0911
0912 NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0913 strerror(errno));
0914 EXFAIL_OUT(ret);
0915 }
0916
0917
0918 l.l_onoff=0;
0919 l.l_linger=0;
0920 if(setsockopt(net->sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) < 0)
0921 {
0922 NDRX_LOG(log_error, "Failed to set SO_LINGER: %s", strerror(errno));
0923 EXFAIL_OUT(ret);
0924 }
0925
0926
0927
0928 #if EX_OS_SUNOS && EX_LSB_RELEASE_VER_MAJOR == 5 && EX_LSB_RELEASE_VER_MINOR == 10
0929 NDRX_LOG(log_warn, "Solaris 10 - SO_RCVTIMEO not suppported.");
0930 #else
0931 memset(&tv, 0, sizeof(tv));
0932 tv.tv_sec = net->rcvtimeout;
0933 NDRX_LOG(log_debug, "Setting SO_RCVTIMEO=%d", tv.tv_sec);
0934 if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
0935 {
0936 NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
0937 net->sock, strerror(errno));
0938 ret=EXFAIL;
0939 goto out;
0940 }
0941 #endif
0942
0943 out:
0944 return ret;
0945 }
0946
0947
0948
0949
0950
0951
0952 expublic in_port_t exnet_get_port(struct sockaddr *sa)
0953 {
0954 if (AF_INET == sa->sa_family)
0955 {
0956 return ntohs(((struct sockaddr_in*)sa)->sin_port);
0957 }
0958 else
0959 {
0960 return ntohs(((struct sockaddr_in6*)sa)->sin6_port);
0961 }
0962 }
0963
0964
0965
0966
0967 exprivate int open_socket(exnetcon_t *net)
0968 {
0969 int ret=EXSUCCEED;
0970 int err;
0971 char ip[(INET6_ADDRSTRLEN)*2];
0972
0973
0974 net->is_connected=EXFALSE;
0975
0976 net->sock = socket(net->addr_cur->ai_family,
0977 SOCK_STREAM, net->addr_cur->ai_protocol);
0978
0979
0980 if (EXFAIL==net->sock)
0981 {
0982 NDRX_LOG(log_error, "Failed to create socket: %s",
0983 strerror(errno));
0984 ret=EXFAIL;
0985 goto out;
0986 }
0987
0988
0989 if (EXSUCCEED!=exnet_configure_setopts(net))
0990 {
0991 EXFAIL_OUT(ret);
0992 }
0993
0994 if (NULL!=inet_ntop (net->addr_cur->ai_family,
0995 &((struct sockaddr_in *)net->addr_cur->ai_addr)->sin_addr, ip, sizeof(ip)))
0996 {
0997 NDRX_LOG(log_info,"Trying to connect to IPv%d address: %s port: %d",
0998 net->addr_cur->ai_family == PF_INET6 ? 6 : 4, ip,
0999 (int)exnet_get_port(net->addr_cur->ai_addr));
1000 }
1001 else
1002 {
1003 NDRX_LOG(log_error, "Failed to extract address info: %s", strerror(errno));
1004 }
1005
1006 if (EXSUCCEED!=connect(net->sock, net->addr_cur->ai_addr,
1007 net->addr_cur->ai_addrlen))
1008 {
1009 err=errno;
1010 NDRX_LOG(log_error, "connect() failed for fd=%d: %d/%s",
1011 net->sock, err, strerror(err));
1012
1013 if (ENETUNREACH==err || ECONNREFUSED==err)
1014 {
1015 NDRX_LOG(log_error, "Try later to connect -> next ip");
1016 close(net->sock);
1017 net->sock=EXFAIL;
1018 goto out;
1019 }
1020 if (EINPROGRESS!=err)
1021 {
1022 ret=EXFAIL;
1023 goto out;
1024 }
1025 }
1026
1027 ndrx_stopwatch_reset(&net->connect_time);
1028
1029
1030 if (EXSUCCEED!=tpext_addpollerfd(net->sock,
1031 POLL_FLAGS,
1032 (void *)net, exnet_poll_cb))
1033 {
1034 NDRX_LOG(log_error, "tpext_addpollerfd failed!");
1035 ret=EXFAIL;
1036 goto out;
1037 }
1038
1039 out:
1040 return ret;
1041 }
1042
1043
1044
1045
1046
1047
1048 exprivate int exnet_schedule_run(exnetcon_t *net)
1049 {
1050 int is_incoming;
1051
1052 if (net->schedule_close)
1053 {
1054 NDRX_LOG(log_warn, "Connection close is scheduled - "
1055 "closing fd %d is_incoming %d",
1056 net->sock, net->is_incoming);
1057 is_incoming=net->is_incoming;
1058
1059 exnet_rwlock_mainth_write(net);
1060 close_socket(net);
1061 exnet_rwlock_mainth_read(net);
1062
1063
1064 if (is_incoming)
1065 {
1066
1067
1068
1069 return EXTRUE;
1070 }
1071 }
1072
1073 return EXFALSE;
1074 }
1075
1076
1077
1078
1079
1080
1081 expublic int exnet_periodic(void)
1082 {
1083 int ret=EXSUCCEED;
1084 exnetcon_t *head = extnet_get_con_head();
1085 exnetcon_t *net, *tmp;
1086
1087 DL_FOREACH_SAFE(head, net, tmp)
1088 {
1089
1090 if (exnet_schedule_run(net))
1091 {
1092 continue;
1093 }
1094
1095
1096 if (EXFAIL==net->sock)
1097 {
1098 if (net->is_server)
1099 {
1100
1101 if (EXSUCCEED!=exnet_addr_next(net))
1102 {
1103 NDRX_LOG(log_error, "Failed to resolve next binding address!");
1104 EXFAIL_OUT(ret);
1105 }
1106
1107
1108 ret = exnet_bind(net);
1109
1110 }
1111 else if (!net->is_incoming)
1112 {
1113 if (EXSUCCEED!=exnet_addr_next(net))
1114 {
1115 NDRX_LOG(log_error, "Failed to resolve next connect address!");
1116 EXFAIL_OUT(ret);
1117 }
1118
1119 ret = open_socket(net);
1120 }
1121 }
1122 else if (!net->is_server)
1123 {
1124
1125 ret=exnet_poll_cb(net->sock, 0, (void *)net);
1126 }
1127 }
1128
1129 out:
1130 return ret;
1131 }
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143 expublic int exnet_install_cb(exnetcon_t *net, int (*p_process_msg)(exnetcon_t *net, char **buf, int len),
1144 int (*p_connected)(exnetcon_t *net), int (*p_disconnected)(exnetcon_t *net),
1145 int (*p_snd_zero_len)(exnetcon_t *net), int (*p_snd_clock_sync)(exnetcon_t *net))
1146 {
1147 int ret=EXSUCCEED;
1148
1149 net->p_process_msg = p_process_msg;
1150 net->p_connected = p_connected;
1151 net->p_disconnected = p_disconnected;
1152 net->p_snd_zero_len = p_snd_zero_len;
1153 net->p_snd_clock_sync = p_snd_clock_sync;
1154
1155 out:
1156 return ret;
1157 }
1158
1159
1160
1161
1162
1163 expublic void exnet_unconfigure(exnetcon_t *net)
1164 {
1165 if (NULL!=net->addrinfos)
1166 {
1167 freeaddrinfo(net->addrinfos);
1168 net->addrinfos=NULL;
1169 }
1170
1171 net->addr_cur=NULL;
1172 }
1173
1174
1175
1176
1177
1178
1179 expublic int exnet_addr_get(exnetcon_t *net)
1180 {
1181 int ret=EXSUCCEED;
1182 struct addrinfo hints;
1183 struct addrinfo *iter;
1184 char ip[(INET6_ADDRSTRLEN)*2];
1185
1186
1187 exnet_unconfigure(net);
1188
1189 if (!net->is_server)
1190 {
1191 NDRX_LOG(log_error, "EXNET: client for: %s:%s", net->addr, net->port);
1192 }
1193 else
1194 {
1195 NDRX_LOG(log_error, "EXNET: server for: %s:%s", net->addr, net->port);
1196 }
1197
1198
1199 memset(&hints, 0, sizeof(struct addrinfo));
1200
1201 if (net->is_ipv6)
1202 {
1203 hints.ai_family = AF_INET6;
1204 }
1205 else
1206 {
1207 hints.ai_family = AF_INET;
1208 }
1209
1210 hints.ai_socktype = SOCK_STREAM;
1211
1212 if (net->is_server)
1213 {
1214 hints.ai_flags = AI_PASSIVE;
1215 }
1216 else
1217 {
1218 hints.ai_flags = 0;
1219 }
1220
1221
1222 if (net->is_numeric)
1223 {
1224 hints.ai_flags|=AI_NUMERICHOST;
1225 }
1226
1227 hints.ai_protocol = 0;
1228
1229 ret = getaddrinfo(net->addr, net->port, &hints, &net->addrinfos);
1230
1231 if (ret != EXSUCCEED)
1232 {
1233 NDRX_LOG(log_error, "Failed to resolve -i addr: getaddrinfo(): %s",
1234 gai_strerror(ret));
1235 EXFAIL_OUT(ret);
1236 }
1237
1238
1239 for(iter=net->addrinfos; iter != NULL;iter=iter->ai_next)
1240 {
1241 if (NULL!=inet_ntop (iter->ai_family,
1242 &((struct sockaddr_in *)iter->ai_addr)->sin_addr, ip, sizeof(ip)))
1243 {
1244 NDRX_LOG(log_info,"Resolved: IPv%d address: %s port: %d",
1245 iter->ai_family == PF_INET6 ? 6 : 4, ip,
1246 (int)exnet_get_port(iter->ai_addr));
1247 }
1248 else
1249 {
1250 NDRX_LOG(log_error, "Failed to get addr info: %s", strerror(errno));
1251 }
1252 }
1253
1254 out:
1255 return ret;
1256 }
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266 expublic int exnet_addr_next(exnetcon_t *net)
1267 {
1268 int ret = EXSUCCEED;
1269
1270 if (NULL==net->addr_cur)
1271 {
1272 net->addr_cur=net->addrinfos;
1273 }
1274 else
1275 {
1276 net->addr_cur=net->addr_cur->ai_next;
1277 }
1278
1279 if (NULL==net->addr_cur)
1280 {
1281 NDRX_LOG(log_warn, "Reload addresses");
1282
1283 ret=exnet_addr_get(net);
1284
1285 if (EXSUCCEED!=ret)
1286 {
1287 NDRX_LOG(log_error, "Failed to resolve bind/connect addresses!");
1288 EXFAIL_OUT(ret);
1289 }
1290
1291 net->addr_cur=net->addrinfos;
1292 }
1293
1294 if (NULL==net->addr_cur)
1295 {
1296 NDRX_LOG(log_error, "NULL Address found");
1297 EXFAIL_OUT(ret);
1298 }
1299
1300 out:
1301 NDRX_LOG(log_error, "exnet_addr_next returns %d", ret);
1302
1303 return ret;
1304 }
1305
1306
1307
1308
1309
1310
1311 expublic int exnet_configure(exnetcon_t *net)
1312 {
1313 int ret = EXSUCCEED;
1314
1315 if (EXSUCCEED!=exnet_addr_get(net))
1316 {
1317 EXFAIL_OUT(ret);
1318 }
1319
1320
1321 exnet_add_con(net);
1322
1323 out:
1324 return ret;
1325 }
1326
1327
1328
1329
1330
1331 expublic void exnet_rwlock_read(exnetcon_t *net)
1332 {
1333 if (EXSUCCEED!=pthread_rwlock_rdlock(&(net->rwlock)))
1334 {
1335 int err = errno;
1336
1337 NDRX_LOG(log_error, "Failed to read lock: %s - exiting", strerror(err));
1338 userlog("Failed to read lock: %s - exiting", strerror(err));
1339 exit(EXFAIL);
1340 }
1341 }
1342
1343
1344
1345
1346
1347 expublic void exnet_rwlock_write(exnetcon_t *net)
1348 {
1349 if (EXSUCCEED!=pthread_rwlock_rdlock(&net->rwlock))
1350 {
1351 int err = errno;
1352
1353 NDRX_LOG(log_error, "Failed to write lock: %s - exiting", strerror(err));
1354 userlog("Failed to write lock: %s - exiting", strerror(err));
1355 exit(EXFAIL);
1356 }
1357 }
1358
1359
1360
1361
1362
1363 expublic void exnet_rwlock_unlock(exnetcon_t *net)
1364 {
1365 if (EXSUCCEED!=pthread_rwlock_unlock(&net->rwlock))
1366 {
1367 int err = errno;
1368
1369 NDRX_LOG(log_error, "Failed to unlock rwlock: %s - exiting", strerror(err));
1370 userlog("Failed to unlock rwlock: %s - exiting", strerror(err));
1371 exit(EXFAIL);
1372 }
1373 }
1374
1375
1376
1377
1378
1379
1380 expublic void exnet_rwlock_mainth_write(exnetcon_t *net)
1381 {
1382 exnet_rwlock_unlock(net);
1383 exnet_rwlock_write(net);
1384 }
1385
1386
1387
1388
1389
1390 expublic void exnet_rwlock_mainth_read(exnetcon_t *net)
1391 {
1392 exnet_rwlock_unlock(net);
1393 exnet_rwlock_read(net);
1394 }
1395
1396
1397
1398
1399 expublic int exnet_is_connected(exnetcon_t *net)
1400 {
1401 return net->is_connected;
1402 }
1403
1404
1405
1406
1407 expublic int exnet_close_shut(exnetcon_t *net)
1408 {
1409 if (net->lock_init)
1410 {
1411 exnet_rwlock_mainth_write(net);
1412 close_socket(net);
1413 exnet_rwlock_mainth_read(net);
1414 }
1415
1416 return EXSUCCEED;
1417 }
1418
1419
1420
1421
1422
1423 expublic int exnet_set_timeout(exnetcon_t *net, int timeout)
1424 {
1425 int ret=EXSUCCEED;
1426 struct timeval tv;
1427
1428 memset(&tv, 0, sizeof(tv));
1429 tv.tv_sec = timeout;
1430
1431 if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
1432 {
1433 NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
1434 net->sock, strerror(errno));
1435 EXFAIL_OUT(ret);
1436 }
1437
1438 out:
1439 return ret;
1440 }
1441
1442
1443
1444
1445
1446 expublic void exnet_reset_struct(exnetcon_t *net)
1447 {
1448 memset(net, 0, sizeof(*net));
1449 net->sock = EXFAIL;
1450 net->rcvtimeout = EXFAIL;
1451 net->len_pfx = EXFAIL;
1452 net->is_server = EXFAIL;
1453 net->rcvtimeout = ndrx_get_G_atmi_env()->time_out;
1454 }
1455
1456
1457
1458
1459
1460
1461
1462 expublic int exnet_net_init(exnetcon_t *net)
1463 {
1464 int ret = EXSUCCEED;
1465 int err;
1466
1467 memset(&(net->rwlock), 0, sizeof(net->rwlock));
1468
1469 if (EXSUCCEED!=(err=pthread_rwlock_init(&(net->rwlock), NULL)))
1470 {
1471 NDRX_LOG(log_error, "Failed to init rwlock: %s", strerror(err));
1472 userlog("Failed to init rwlock: %s", strerror(err));
1473 EXFAIL_OUT(ret);
1474 }
1475
1476 MUTEX_VAR_INIT(net->sendlock);
1477 MUTEX_VAR_INIT(net->rcvlock);
1478 MUTEX_VAR_INIT(net->flagslock);
1479
1480
1481 ndrx_stopwatch_reset(&net->periodic_stopwatch);
1482
1483
1484 if (EXSUCCEED!=(err=pthread_rwlock_rdlock(&(net->rwlock))))
1485 {
1486 userlog("Failed to acquire read lock: %s", strerror(err));
1487 NDRX_LOG(log_error, "Failed to acquire read lock - exiting...: %s",
1488 strerror(err));
1489 exit(EXFAIL);
1490 }
1491 net->lock_init=EXTRUE;
1492
1493 out:
1494 return ret;
1495 }
1496