0001 
0002 
0003 
0004 
0005 
0006 
0007 
0008 
0009 
0010 
0011 
0012 
0013 
0014 
0015 
0016 
0017 
0018 
0019 
0020 
0021 
0022 
0023 
0024 
0025 
0026 
0027 
0028 
0029 
0030 
0031 
0032 
0033 
0034 
0035 
0036 
0037 
0038 
0039 
0040 
0041 
0042 
0043 
0044 
0045 #include <ndrx_config.h>
0046 #include <sys/socket.h>
0047 #include <sys/time.h>
0048 #include <sys/types.h>
0049 #include <arpa/inet.h>
0050 #include <netinet/in.h>
0051 #include <netinet/tcp.h>
0052 #include <errno.h>
0053 #include <fcntl.h>
0054 #include <netdb.h>
0055 #include <string.h>
0056 #include <unistd.h>
0057 
0058 #include <ndrstandard.h>
0059 
0060 #if defined(EX_USE_EPOLL)
0061 
0062 #include <sys/epoll.h>
0063 
0064 #elif defined(EX_USE_KQUEUE)
0065 
0066 #include <sys/event.h>
0067 
0068 #endif
0069 
0070 #include <poll.h>
0071 
0072 #include <atmi.h>
0073 #include <stdio.h>
0074 #include <stdlib.h>
0075 #include <exnet.h>
0076 #include <ndebug.h>
0077 #include <utlist.h>
0078 
0079 #include <atmi_int.h>
0080 #include <ndrx_config.h>
0081 
0082 
0083 
0084 #define DBUF_SZ (NDRX_MSGSIZEMAX - net->dl) 
0085 
0086 #if defined(EX_USE_EPOLL)
0087 
0088 #define POLL_FLAGS (EPOLLIN | EPOLLHUP)
0089 
0090 #elif defined(EX_USE_KQUEUE)
0091 
0092 #define POLL_FLAGS EVFILT_READ
0093 
0094 #else
0095 
0096 #define POLL_FLAGS (POLLIN)
0097 
0098 #endif
0099 
0100 
0101 
0102 
0103 
0104 
0105 exprivate MUTEX_LOCKDECL(M_send_lock);
0106 exprivate MUTEX_LOCKDECL(M_recv_lock); 
0107 
0108 
0109 exprivate int close_socket(exnetcon_t *net);
0110 exprivate int open_socket(exnetcon_t *net);
0111 exprivate ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n, 
0112         int flags, int appflags);
0113 
0114 exprivate int exnet_schedule_run(exnetcon_t *net);
0115 
0116 
0117 
0118 
0119 
0120 
0121 
0122 
0123 expublic long exnet_stopwatch_get_delta_sec(exnetcon_t *net, ndrx_stopwatch_t *w)
0124 {
0125     long ret;
0126     
0127     MUTEX_LOCK_V(net->flagslock);
0128     
0129     ret = ndrx_stopwatch_get_delta_sec(w);
0130     
0131     MUTEX_UNLOCK_V(net->flagslock);
0132     
0133     return ret;
0134 }
0135 
0136 
0137 
0138 
0139 
0140 
0141 expublic void exnet_stopwatch_reset(exnetcon_t *net, ndrx_stopwatch_t *w)
0142 {
0143     MUTEX_LOCK_V(net->flagslock);
0144     
0145     ndrx_stopwatch_reset(w);
0146     
0147     MUTEX_UNLOCK_V(net->flagslock);
0148 }   
0149 
0150 
0151 
0152 
0153 
0154 
0155 
0156 expublic int exnet_send_sync(exnetcon_t *net, char *hdr_buf, int hdr_len, 
0157         char *buf, int len, int flags, int appflags)
0158 {
0159     int ret=EXSUCCEED;
0160     int allow_size = DATA_BUF_MAX;
0161     int sent = 0;
0162     
0163     char d[NET_LEN_PFX_LEN+128];    
0164     int size_to_send, len_whdr, hdr_snd_size = 0;
0165     int tmp_s;
0166     int err;
0167     int retry;
0168     ndrx_stopwatch_t w;
0169 
0170     
0171     len_whdr = hdr_len + len;
0172     if (len_whdr>allow_size)
0173     {
0174         NDRX_LOG(log_error, "Buffer too large for sending! "
0175                         "requested: %d, allowed: %d", len_whdr, allow_size);
0176         EXFAIL_OUT(ret);
0177     }
0178 
0179     
0180 
0181 
0182 
0183 
0184 
0185 
0186     
0187     if (4==net->len_pfx)
0188     {
0189         
0190         d[0] = (len_whdr >> 24) & 0xff;
0191         d[1] = (len_whdr >> 16) & 0xff;
0192         d[2] = (len_whdr >> 8) & 0xff;
0193         d[3] = (len_whdr) & 0xff;
0194         hdr_snd_size=net->len_pfx;
0195     }
0196     
0197     if (NULL!=hdr_buf)
0198     {
0199         
0200         memcpy(d+net->len_pfx, hdr_buf, hdr_len);
0201         hdr_snd_size+=hdr_len;
0202     }
0203 
0204     size_to_send = len+hdr_snd_size;
0205 
0206     
0207     MUTEX_LOCK_V(net->sendlock);
0208     do
0209     {
0210         err = 0;
0211         NDRX_LOG(log_debug, "Sending, len: %d, total msg: %d", 
0212                 size_to_send-sent, size_to_send);
0213         
0214         if (!(appflags & APPFLAGS_MASK))
0215         {
0216             if (sent < hdr_snd_size)
0217             {
0218                 NDRX_DUMP(log_debug, "Sending, msg (msg len pfx)", 
0219                         d+sent, hdr_snd_size-sent);
0220             }
0221             else
0222             {
0223                 NDRX_DUMP(log_debug, "Sending, msg ", buf+sent-hdr_snd_size, 
0224                             size_to_send-sent);
0225             }
0226         }
0227         else
0228         {
0229             NDRX_LOG(log_debug, "*** MSG DUMP IS MASKED ***");
0230         }
0231         
0232         ndrx_stopwatch_reset(&w);
0233         
0234         do
0235         {
0236             err = 0;
0237             retry = EXFALSE;
0238             
0239             if (sent<hdr_snd_size)
0240             {
0241                 tmp_s = send(net->sock, d+sent, hdr_snd_size-sent, flags);
0242             }
0243             else
0244             {
0245                 
0246                 tmp_s = send(net->sock, buf+sent-hdr_snd_size, 
0247                         size_to_send-sent, flags);
0248             }
0249             
0250             if (EXFAIL==tmp_s)
0251             {
0252                 err = errno;
0253             }
0254             else
0255             {
0256                 
0257                 exnet_stopwatch_reset(net, &net->last_snd);
0258             }
0259             
0260             if (EAGAIN==err || EWOULDBLOCK==err)
0261             {
0262                 int spent = ndrx_stopwatch_get_delta_sec(&w);
0263         int rcvtim = net->rcvtimeout - spent;
0264         struct pollfd ufd;
0265 
0266         memset(&ufd, 0, sizeof ufd);
0267 
0268                 NDRX_LOG(log_warn, "Socket full: %s - retry, "
0269                         "time spent: %d, max: %d - POLLOUT (rcvtim=%d) sent: %d tot: %d",
0270                         strerror(err), spent, net->rcvtimeout, rcvtim, sent, size_to_send);
0271 
0272         ufd.fd = net->sock; 
0273         ufd.events = POLLOUT;
0274 
0275         if (rcvtim < 1 || poll(&ufd, 1, rcvtim * 1000) < 0 || ufd.revents & POLLERR)
0276         {
0277                     NDRX_LOG(log_error, "ERROR! Failed to send, socket full: %s "
0278                             "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)", 
0279                         strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0280                             (ufd.revents & POLLERR));
0281                     
0282                     userlog("ERROR! Failed to send, socket full: %s "
0283                             "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)",
0284                             strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0285                             (ufd.revents & POLLERR));
0286                     
0287                     net->schedule_close = EXTRUE;
0288                     ret=EXFAIL;
0289                     goto out_unlock;
0290                 }
0291                 
0292                 retry = EXTRUE;
0293                 
0294             }
0295         }
0296         while (retry);
0297 
0298         if (EXFAIL==tmp_s)
0299         {
0300             NDRX_LOG(log_error, "send failure: %s",
0301                             strerror(err));
0302             
0303             
0304             NDRX_LOG(log_error, "Scheduling connection close...");
0305             net->schedule_close = EXTRUE;
0306             ret=EXFAIL;
0307             break;
0308         }
0309         else
0310         {
0311             NDRX_LOG(log_debug, "Sent %d bytes", tmp_s);
0312                         
0313             sent+=tmp_s;
0314         }
0315 
0316         if (sent < size_to_send)
0317         {
0318             NDRX_LOG(log_debug, "partial submission: total: %d, sent: %d, left "
0319                     "for sending: %d - continue", size_to_send, sent, 
0320                     size_to_send - sent);
0321         }
0322         
0323     } 
0324     while (EXSUCCEED==ret && sent < size_to_send);
0325 
0326 out_unlock:
0327     MUTEX_UNLOCK_V(net->sendlock);
0328 
0329 out:
0330     
0331     return ret;
0332 }
0333 
0334 
0335 
0336 
0337 
0338 
0339 exprivate  ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n, int flags, int appflags)
0340 {
0341     ssize_t ret;
0342 
0343     ret = recv (net->sock, __buf, __n, flags);
0344 
0345     if (0==ret)
0346     {
0347         NDRX_LOG(log_error, "Disconnect received - schedule close!");
0348         
0349         net->schedule_close = EXTRUE;
0350         ret=EXFAIL;
0351         goto out;
0352     }
0353     else if (EXFAIL==ret)
0354     {
0355         if (EAGAIN==errno || EWOULDBLOCK==errno)
0356         {
0357             NDRX_LOG(log_info, "Still no data (waiting...)");
0358         }
0359         else
0360         {
0361             NDRX_LOG(log_error, "recv failure: %s - schedule close", 
0362                     strerror(errno));
0363             
0364             net->schedule_close = EXTRUE;
0365         }
0366 
0367         ret=EXFAIL;
0368         goto out;
0369     }
0370     else
0371     {
0372         
0373         exnet_stopwatch_reset(net, &net->last_rcv);
0374     }
0375     
0376 out:
0377     return ret;
0378 }
0379 
0380 
0381 
0382 
0383 
0384 exprivate int get_full_len(exnetcon_t *net)
0385 {
0386     int  pfx_len, msg_len;
0387 
0388     
0389     pfx_len = ( 
0390                 (net->d[0] & 0xff) << 24
0391               | (net->d[1] & 0xff) << 16
0392               | (net->d[2] & 0xff) << 8
0393               | (net->d[3] & 0xff)
0394             );
0395 
0396     msg_len = pfx_len+net->len_pfx;
0397     NDRX_LOG(log_debug, "pfx_len=%d msg_len=%d", pfx_len, msg_len);
0398     
0399     
0400 
0401     return msg_len;
0402 }
0403 
0404 
0405 
0406 
0407 
0408 
0409 
0410 
0411 
0412 
0413 
0414 
0415 expublic int exnet_recv_sync(exnetcon_t *net, char **buf, int *len, int flags, int appflags)
0416 {
0417     int ret=EXSUCCEED;
0418     int got_len;
0419     int full_msg;
0420     int download_size;
0421     
0422     
0423     MUTEX_LOCK_V(net->rcvlock);
0424     
0425     if (0==net->dl)
0426     {
0427         
0428         ndrx_stopwatch_reset(&net->rcv_timer);
0429     }
0430     
0431     if (NULL==net->dlsysbuf)
0432     {
0433         size_t tmp_buf_len;
0434         
0435         NDRX_SYSBUF_MALLOC_OUT(net->dlsysbuf, tmp_buf_len, ret);
0436     }
0437     
0438     while (EXSUCCEED==ret)
0439     {
0440         
0441         if (net->dl>=net->len_pfx)
0442         {
0443             full_msg = get_full_len(net);
0444             
0445             if (full_msg < 0)
0446             {
0447                 NDRX_LOG(log_error, "ERROR ! received len %d < 0 - closing socket!", 
0448                         full_msg);
0449                 userlog("ERROR ! received len %d < 0! - schedule close socket!", full_msg);
0450                 net->schedule_close = EXTRUE;
0451                 ret=EXFAIL;
0452                 break;
0453             }
0454             else if (full_msg > DATA_BUF_MAX)
0455             {
0456                 NDRX_LOG(log_error, "ERROR ! received len %d > max buf %ld! - "
0457                         "closing socket!", full_msg, DATA_BUF_MAX);
0458                 userlog("ERROR ! received len %d > max buf %ld! - "
0459                         "closing socket!", full_msg, DATA_BUF_MAX);
0460                 net->schedule_close = EXTRUE;
0461                 ret=EXFAIL;
0462                 break;
0463             }
0464 
0465             NDRX_LOG(log_debug, "Data buffered - "
0466                     "buf: [%d], full: %d",
0467                     net->dl, full_msg);
0468             if (net->dl >= full_msg)
0469             {
0470                 
0471                 
0472 
0473 
0474 
0475                 
0476                 *len = full_msg-net->len_pfx;
0477                 net->dl -= full_msg;
0478                 *buf=net->dlsysbuf;
0479                 net->dlsysbuf=NULL;
0480                 
0481                 
0482                 if (0!=net->dl)
0483                 {
0484                     NDRX_LOG(log_error, "Expected left over 0 but got %d", net->dl);
0485                     userlog("Expected left over 0 but got %d", net->dl);
0486                     abort();
0487                 }
0488                 
0489                 if (0==*len)
0490                 {
0491                     NDRX_LOG(log_debug, "zero length message - ignore!");
0492                     ret=EXFAIL;
0493                 }
0494                 else if (!(appflags & APPFLAGS_MASK))
0495                 {
0496                     NDRX_DUMP(log_debug, "Got message: ", *buf, *len);
0497                 }
0498                 
0499                 MUTEX_UNLOCK_V(net->rcvlock);
0500                 return ret;
0501             }
0502         }
0503 
0504         NDRX_LOG(log_debug, "Data needs to be received, dl=%d", net->dl);
0505         
0506         
0507 
0508 
0509 
0510         
0511         if (net->dl < net->len_pfx)
0512         {
0513             
0514             download_size = net->len_pfx - net->dl;
0515         }
0516         else
0517         {
0518             
0519             download_size = full_msg - net->dl;
0520         }
0521         
0522         
0523 
0524 
0525 
0526         
0527         if (download_size < 0)
0528         {
0529             NDRX_LOG(log_error, "ERROR ! Expected download size < 0 (%d)", download_size);
0530             userlog("ERROR ! Expected download size < 0 (%d)", download_size);
0531             net->schedule_close = EXTRUE;
0532             ret=EXFAIL;
0533             break;
0534         }
0535         else if (download_size > DBUF_SZ)
0536         {
0537             NDRX_LOG(log_error, "ERROR ! Expected download size bigger "
0538                     "than buffer left: %d > %d", download_size, (DBUF_SZ));
0539             userlog("ERROR ! Expected download size bigger "
0540                     "than buffer left: %d > %d", download_size, (DBUF_SZ));
0541             
0542             
0543 
0544 
0545 
0546             net->schedule_close = EXTRUE;
0547             ret=EXFAIL;
0548             break;
0549         }
0550         
0551         
0552 
0553 
0554 
0555         if (net->dl < net->len_pfx)
0556         {
0557             if (EXFAIL==(got_len=recv_wrap(net, net->d+net->dl, download_size, 
0558                     flags, appflags)))
0559             {
0560                 
0561                 ret=EXFAIL;
0562             }
0563             else
0564             {
0565                 if (!(appflags&APPFLAGS_MASK))
0566                 {
0567                     NDRX_DUMP(log_debug, "Got packet: ",
0568                             net->d+net->dl, got_len);
0569                 }
0570                 net->dl+=got_len;
0571             }
0572         }
0573         else
0574         {
0575             if (EXFAIL==(got_len=recv_wrap(net, net->dlsysbuf+net->dl-net->len_pfx, 
0576                     download_size, flags, appflags)))
0577             {
0578                 
0579                 ret=EXFAIL;
0580             }
0581             else
0582             {
0583                 if (!(appflags&APPFLAGS_MASK))
0584                 {
0585                     NDRX_DUMP(log_debug, "Got packet: ",
0586                             net->dlsysbuf+net->dl-net->len_pfx, got_len);
0587                 }
0588                 net->dl+=got_len;
0589             }
0590         }
0591         
0592     }
0593     
0594     
0595 
0596 
0597     
0598     if (!net->schedule_close && 
0599             ndrx_stopwatch_get_delta_sec(&net->rcv_timer) >=net->rcvtimeout )
0600     {
0601         NDRX_LOG(log_error, "This is time-out => schedule close socket !");
0602         net->schedule_close = EXTRUE;
0603     }
0604 
0605 out:    
0606     MUTEX_UNLOCK_V(net->rcvlock);
0607     
0608     
0609     ret=EXFAIL;
0610 
0611     return ret;
0612 }
0613 
0614 
0615 
0616 
0617 
0618 expublic int exnet_b4_poll_cb(void)
0619 {
0620     int ret=EXSUCCEED;
0621     exnetcon_t *head = extnet_get_con_head();
0622     exnetcon_t *net, *tmp;
0623     
0624     DL_FOREACH_SAFE(head, net, tmp)
0625     {
0626         if (exnet_schedule_run(net))
0627         {
0628             continue;
0629         }
0630     }
0631 
0632 out:
0633     return ret;
0634 }
0635 
0636 
0637 
0638 
0639 expublic int exnet_poll_cb(int fd, uint32_t events, void *ptr1)
0640 {
0641     int ret;
0642     int so_error=0;
0643     socklen_t len = sizeof so_error;
0644     exnetcon_t *net = (exnetcon_t *)ptr1;   
0645     
0646     int buflen = 0;
0647     char *buf = NULL;
0648     
0649     
0650     if (exnet_schedule_run(net))
0651     {
0652         goto out;
0653     }
0654     
0655     
0656 
0657 
0658 
0659     
0660     if (EXSUCCEED!=getsockopt(net->sock, SOL_SOCKET, SO_ERROR, &so_error, &len))
0661     {
0662         NDRX_LOG(log_error, "Failed go get getsockopt: %s",
0663                         strerror(errno));
0664         ret=EXFAIL;
0665         goto out;
0666     }
0667 
0668     if (0==so_error && !net->is_connected
0669 
0670 #ifndef  EX_USE_KQUEUE
0671     && events 
0672 #endif
0673     )
0674     {
0675 
0676         
0677         
0678         exnet_rwlock_mainth_write(net);
0679         EXNET_CONNECTED(net);
0680         exnet_rwlock_mainth_read(net);
0681         
0682         NDRX_LOG(log_warn, "Connection is now open!");
0683 
0684         
0685         if (NULL!=net->p_connected && EXSUCCEED!=net->p_connected(net))
0686         {
0687             NDRX_LOG(log_error, "Connected notification "
0688                             "callback failed!");
0689             ret=EXFAIL;
0690             goto out;
0691         }
0692 
0693     }
0694 
0695     if (0==so_error && !net->is_connected && 
0696             ndrx_stopwatch_get_delta_sec(&net->connect_time) > net->rcvtimeout)
0697     {
0698         NDRX_LOG(log_error, "Cannot establish connection to server in "
0699                 "time: %ld secs", ndrx_stopwatch_get_delta_sec(&net->connect_time));
0700         
0701         net->schedule_close = EXTRUE;
0702         
0703         goto out;
0704     }
0705     else if (0!=so_error)
0706     {
0707         if (!net->is_connected)
0708         {
0709             NDRX_LOG(log_error, "Failed to connect to server: %s",
0710                             strerror(so_error));
0711         }
0712         else
0713         {
0714             NDRX_LOG(log_error, "Socket client failed: %s",
0715                             strerror(so_error));
0716         }
0717 
0718         if (EINPROGRESS!=errno)
0719         {
0720             net->schedule_close = EXTRUE;
0721             
0722             
0723             goto out;
0724         }
0725     }
0726     else if (net->is_connected)
0727     {
0728         long rcvt;
0729         
0730         
0731         
0732 
0733 
0734 
0735         
0736         
0737 
0738 
0739 
0740 
0741 
0742         if (net->periodic_zero && 
0743                 exnet_stopwatch_get_delta_sec(net, &net->last_snd) > net->periodic_zero)
0744         {
0745             NDRX_LOG(log_info, "About to issue zero length "
0746                     "message on fd %d", net->sock);
0747 
0748             
0749 
0750 
0751             net->p_snd_zero_len(net);
0752         }
0753         
0754         if (net->p_snd_clock_sync && net->periodic_clock_time && 
0755                 ndrx_stopwatch_get_delta_sec(&net->periodic_stopwatch) > net->periodic_clock_time)
0756         {
0757             NDRX_LOG(log_info, "About to issue clock sync "
0758                     "message on fd %d", net->sock);
0759             net->p_snd_clock_sync(net);
0760             
0761             
0762             ndrx_stopwatch_reset(&net->periodic_stopwatch);
0763         }
0764         
0765         if (net->recv_activity_timeout && 
0766                 (rcvt=exnet_stopwatch_get_delta_sec(net, &net->last_rcv)) > net->recv_activity_timeout)
0767         {
0768             NDRX_LOG(log_error, "No data received in %ld sec (max with out data: %d) "
0769                     "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0770             userlog("No data received in %ld sec (max with out data: %d) "
0771                     "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0772             net->schedule_close = EXTRUE;
0773         }
0774     }
0775 
0776     
0777 #if defined(EX_USE_EPOLL)
0778     
0779     if (events & EPOLLIN)
0780         
0781 #elif defined(EX_USE_KQUEUE)
0782         
0783     if (1) 
0784         
0785 #else
0786     if (events & POLLIN)
0787 #endif
0788     {
0789         
0790 
0791         if(EXSUCCEED == exnet_recv_sync(net, &buf, &buflen, 0, 0))
0792         {
0793             
0794             net->p_process_msg(net, &buf, buflen);
0795         }
0796     }
0797 
0798 out:
0799 
0800     
0801     if (NULL!=buf)
0802     {
0803         NDRX_SYSBUF_FREE(buf);
0804     }
0805 
0806     return EXSUCCEED;
0807 }
0808 
0809 
0810 
0811 
0812 
0813 exprivate int close_socket(exnetcon_t *net)
0814 {
0815     int ret=EXSUCCEED;
0816 
0817     NDRX_LOG(log_warn, "Closing socket %d...", net->sock);
0818     net->dl = 0; 
0819     
0820     net->is_connected=EXFALSE; 
0821     
0822     if (EXFAIL!=net->sock)
0823     {
0824         
0825         if (EXSUCCEED!=tpext_delpollerfd(net->sock))
0826         {
0827             NDRX_LOG(log_error, "Failed to remove polling extension: %s",
0828                                 tpstrerror(tperrno));
0829         }
0830 
0831         
0832         if (EXSUCCEED!=shutdown(net->sock, SHUT_RDWR))
0833         {
0834             NDRX_LOG(log_error, "Failed to shutdown socket: %s",
0835                                     strerror(errno));
0836         }
0837 
0838         if (EXSUCCEED!=close(net->sock))
0839         {
0840             NDRX_LOG(log_error, "Failed to close socket: %s",
0841                                     strerror(errno));
0842             ret=EXFAIL;
0843             goto out;
0844         }
0845     }
0846 
0847 out:
0848     net->sock=EXFAIL;
0849     net->schedule_close = EXFALSE;
0850     
0851     if (NULL!=net->p_disconnected && EXSUCCEED!=net->p_disconnected(net))
0852     {
0853         NDRX_LOG(log_error, "Disconnected notification "
0854                         "callback failed!");
0855         ret=EXFAIL;
0856     }
0857 #if 0
0858     
0859     if (net->is_incoming)
0860     {
0861         exnet_del_con(net);
0862         
0863         exnet_remove_incoming(net);
0864     }
0865 #endif
0866     
0867     if (net->is_incoming)
0868     {
0869         exnet_remove_incoming(net);
0870     }
0871     
0872     return ret;
0873 }
0874 
0875 
0876 
0877 
0878 
0879 
0880 expublic int exnet_configure_setopts(exnetcon_t *net)
0881 {
0882     int ret=EXSUCCEED;
0883     struct timeval tv;
0884     int flag = 1;
0885     int result;
0886     struct linger l;
0887     int enable = EXTRUE;
0888 
0889     
0890     if (EXFAIL==fcntl(net->sock, F_SETFL, O_NONBLOCK))
0891     {
0892         NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0893                                 strerror(errno));
0894         EXFAIL_OUT(ret);
0895     }
0896     
0897     if (setsockopt(net->sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
0898     {
0899         NDRX_LOG(log_error, "Failed to set SO_REUSEADDR: %s", strerror(errno));
0900         EXFAIL_OUT(ret);
0901     }
0902     
0903 
0904     if (EXFAIL==(result = setsockopt(net->sock,            
0905                             IPPROTO_TCP,     
0906                             TCP_NODELAY,     
0907                             (char *) &flag,  
0908 
0909                             sizeof(int))))    
0910     {
0911 
0912         NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0913                                 strerror(errno));
0914         EXFAIL_OUT(ret);
0915     }
0916     
0917     
0918     l.l_onoff=0;
0919     l.l_linger=0;
0920     if(setsockopt(net->sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) < 0)
0921     {
0922         NDRX_LOG(log_error, "Failed to set SO_LINGER: %s", strerror(errno));
0923         EXFAIL_OUT(ret);
0924     }
0925     
0926 
0927 
0928 #if EX_OS_SUNOS && EX_LSB_RELEASE_VER_MAJOR == 5 && EX_LSB_RELEASE_VER_MINOR == 10
0929     NDRX_LOG(log_warn, "Solaris 10 - SO_RCVTIMEO not suppported.");
0930 #else
0931     memset(&tv, 0, sizeof(tv));
0932     tv.tv_sec = net->rcvtimeout;
0933     NDRX_LOG(log_debug, "Setting SO_RCVTIMEO=%d", tv.tv_sec);
0934     if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
0935     {
0936         NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
0937                         net->sock, strerror(errno));
0938         ret=EXFAIL;
0939         goto out;
0940     }
0941 #endif
0942     
0943 out:
0944     return ret;
0945 }
0946 
0947 
0948 
0949 
0950 
0951 
0952 expublic in_port_t exnet_get_port(struct sockaddr *sa)
0953 {
0954     if (AF_INET == sa->sa_family)
0955     {
0956         return ntohs(((struct sockaddr_in*)sa)->sin_port);
0957     }
0958     else
0959     {
0960         return ntohs(((struct sockaddr_in6*)sa)->sin6_port);
0961     }
0962 }
0963 
0964 
0965 
0966 
0967 exprivate int open_socket(exnetcon_t *net)
0968 {
0969     int ret=EXSUCCEED;
0970     int err;
0971     char ip[(INET6_ADDRSTRLEN)*2];
0972     
0973     
0974     net->is_connected=EXFALSE;
0975 
0976     net->sock = socket(net->addr_cur->ai_family, 
0977             SOCK_STREAM, net->addr_cur->ai_protocol);
0978 
0979     
0980     if (EXFAIL==net->sock)
0981     {
0982         NDRX_LOG(log_error, "Failed to create socket: %s",
0983                                 strerror(errno));
0984         ret=EXFAIL;
0985         goto out;
0986     }
0987     
0988     
0989     if (EXSUCCEED!=exnet_configure_setopts(net))
0990     {
0991         EXFAIL_OUT(ret);
0992     }
0993 
0994     if (NULL!=inet_ntop (net->addr_cur->ai_family, 
0995             &((struct sockaddr_in *)net->addr_cur->ai_addr)->sin_addr, ip, sizeof(ip)))
0996     {
0997         NDRX_LOG(log_info,"Trying to connect to IPv%d address: %s port: %d", 
0998                 net->addr_cur->ai_family == PF_INET6 ? 6 : 4, ip, 
0999                 (int)exnet_get_port(net->addr_cur->ai_addr));
1000     }
1001     else
1002     {
1003         NDRX_LOG(log_error, "Failed to extract address info: %s", strerror(errno));
1004     }
1005 
1006     if (EXSUCCEED!=connect(net->sock, net->addr_cur->ai_addr, 
1007             net->addr_cur->ai_addrlen))
1008     {
1009         err=errno;
1010         NDRX_LOG(log_error, "connect() failed for fd=%d: %d/%s",
1011                         net->sock, err, strerror(err));
1012         
1013         if (ENETUNREACH==err || ECONNREFUSED==err)
1014         {
1015             NDRX_LOG(log_error, "Try later to connect -> next ip");
1016             close(net->sock);
1017             net->sock=EXFAIL;
1018             goto out;
1019         }
1020         if (EINPROGRESS!=err)
1021         {
1022             ret=EXFAIL;
1023             goto out;
1024         }
1025     }
1026     
1027     ndrx_stopwatch_reset(&net->connect_time);
1028 
1029     
1030     if (EXSUCCEED!=tpext_addpollerfd(net->sock,
1031             POLL_FLAGS,
1032             (void *)net, exnet_poll_cb))
1033     {
1034         NDRX_LOG(log_error, "tpext_addpollerfd failed!");
1035         ret=EXFAIL;
1036         goto out;
1037     }
1038     
1039 out:
1040     return ret;
1041 }
1042 
1043 
1044 
1045 
1046 
1047 
1048 exprivate int exnet_schedule_run(exnetcon_t *net)
1049 {
1050     int is_incoming;
1051     
1052     if (net->schedule_close)
1053     {
1054         NDRX_LOG(log_warn, "Connection close is scheduled - "
1055                 "closing fd %d is_incoming %d", 
1056                 net->sock, net->is_incoming);
1057         is_incoming=net->is_incoming;
1058         
1059         exnet_rwlock_mainth_write(net);
1060         close_socket(net);
1061         exnet_rwlock_mainth_read(net);
1062 
1063         
1064         if (is_incoming)
1065         {
1066             
1067 
1068 
1069             return EXTRUE;
1070         }
1071     }
1072     
1073     return EXFALSE;
1074 }
1075 
1076 
1077 
1078 
1079 
1080 
1081 expublic int exnet_periodic(void)
1082 {
1083     int ret=EXSUCCEED;
1084     exnetcon_t *head = extnet_get_con_head();
1085     exnetcon_t *net, *tmp;
1086    
1087     DL_FOREACH_SAFE(head, net, tmp)
1088     {
1089         
1090         if (exnet_schedule_run(net))
1091         {
1092             continue;
1093         }
1094         
1095         
1096         if (EXFAIL==net->sock)
1097         {
1098             if (net->is_server)
1099             {
1100                 
1101                 if (EXSUCCEED!=exnet_addr_next(net))
1102                 {
1103                     NDRX_LOG(log_error, "Failed to resolve next binding address!");
1104                     EXFAIL_OUT(ret);
1105                 }
1106                 
1107                 
1108                 ret = exnet_bind(net);
1109                 
1110             }
1111             else if (!net->is_incoming)
1112             {
1113                 if (EXSUCCEED!=exnet_addr_next(net))
1114                 {
1115                     NDRX_LOG(log_error, "Failed to resolve next connect address!");
1116                     EXFAIL_OUT(ret);
1117                 }
1118                 
1119                 ret = open_socket(net);
1120             }
1121         }
1122         else if (!net->is_server)
1123         {
1124             
1125             ret=exnet_poll_cb(net->sock, 0, (void *)net);
1126         }
1127     }
1128 
1129 out:
1130     return ret;
1131 }
1132 
1133 
1134 
1135 
1136 
1137 
1138 
1139 
1140 
1141 
1142 
1143 expublic int exnet_install_cb(exnetcon_t *net, int (*p_process_msg)(exnetcon_t *net, char **buf, int len),
1144         int (*p_connected)(exnetcon_t *net), int (*p_disconnected)(exnetcon_t *net),
1145                 int (*p_snd_zero_len)(exnetcon_t *net), int (*p_snd_clock_sync)(exnetcon_t *net))
1146 {
1147     int ret=EXSUCCEED;
1148 
1149     net->p_process_msg = p_process_msg;
1150     net->p_connected = p_connected;
1151     net->p_disconnected = p_disconnected;
1152     net->p_snd_zero_len = p_snd_zero_len;
1153     net->p_snd_clock_sync = p_snd_clock_sync;
1154 
1155 out:
1156     return ret;
1157 }
1158 
1159 
1160 
1161 
1162 
1163 expublic void exnet_unconfigure(exnetcon_t *net)
1164 {
1165     if (NULL!=net->addrinfos)
1166     {
1167         freeaddrinfo(net->addrinfos);
1168         net->addrinfos=NULL;
1169     }
1170     
1171     net->addr_cur=NULL;
1172 }
1173 
1174 
1175 
1176 
1177 
1178 
1179 expublic int exnet_addr_get(exnetcon_t *net)
1180 {
1181     int ret=EXSUCCEED;
1182     struct addrinfo hints;
1183     struct addrinfo *iter;
1184     char ip[(INET6_ADDRSTRLEN)*2];
1185     
1186     
1187     exnet_unconfigure(net);
1188     
1189     if (!net->is_server)
1190     {
1191         NDRX_LOG(log_error, "EXNET: client for: %s:%s", net->addr, net->port);
1192     }
1193     else
1194     {
1195         NDRX_LOG(log_error, "EXNET: server for: %s:%s", net->addr, net->port);
1196     }
1197     
1198     
1199     memset(&hints, 0, sizeof(struct addrinfo));
1200     
1201     if (net->is_ipv6)
1202     {
1203         hints.ai_family = AF_INET6;    
1204     }
1205     else
1206     {
1207         hints.ai_family = AF_INET;    
1208     }
1209     
1210     hints.ai_socktype = SOCK_STREAM; 
1211     
1212     if (net->is_server)
1213     {
1214         hints.ai_flags = AI_PASSIVE;
1215     }
1216     else
1217     {
1218         hints.ai_flags = 0;
1219     }
1220     
1221     
1222     if (net->is_numeric)
1223     {
1224         hints.ai_flags|=AI_NUMERICHOST;
1225     }
1226     
1227     hints.ai_protocol = 0; 
1228 
1229     ret = getaddrinfo(net->addr, net->port, &hints, &net->addrinfos);
1230     
1231     if (ret != EXSUCCEED)
1232     {
1233         NDRX_LOG(log_error, "Failed to resolve -i addr: getaddrinfo(): %s", 
1234                 gai_strerror(ret));
1235         EXFAIL_OUT(ret);
1236     }
1237     
1238     
1239     for(iter=net->addrinfos; iter != NULL;iter=iter->ai_next)
1240     {
1241         if (NULL!=inet_ntop (iter->ai_family, 
1242             &((struct sockaddr_in *)iter->ai_addr)->sin_addr, ip, sizeof(ip)))
1243         {
1244             NDRX_LOG(log_info,"Resolved: IPv%d address: %s port: %d", 
1245                     iter->ai_family == PF_INET6 ? 6 : 4, ip, 
1246                     (int)exnet_get_port(iter->ai_addr));
1247         }
1248         else
1249         {
1250             NDRX_LOG(log_error, "Failed to get addr info: %s", strerror(errno));
1251         }
1252     }
1253     
1254 out:
1255     return ret;
1256 }
1257 
1258 
1259 
1260 
1261 
1262 
1263 
1264 
1265 
1266 expublic int exnet_addr_next(exnetcon_t *net)
1267 {
1268     int ret = EXSUCCEED;
1269     
1270     if (NULL==net->addr_cur)
1271     {
1272         net->addr_cur=net->addrinfos;
1273     }
1274     else
1275     {
1276         net->addr_cur=net->addr_cur->ai_next;
1277     }
1278     
1279     if (NULL==net->addr_cur)
1280     {
1281         NDRX_LOG(log_warn, "Reload addresses");
1282         
1283         ret=exnet_addr_get(net);
1284         
1285         if (EXSUCCEED!=ret)
1286         {
1287             NDRX_LOG(log_error, "Failed to resolve bind/connect addresses!");
1288             EXFAIL_OUT(ret);
1289         }
1290         
1291         net->addr_cur=net->addrinfos;
1292     }
1293     
1294     if (NULL==net->addr_cur)
1295     {
1296         NDRX_LOG(log_error, "NULL Address found");
1297         EXFAIL_OUT(ret);
1298     }
1299     
1300 out:
1301     NDRX_LOG(log_error, "exnet_addr_next returns %d", ret);
1302 
1303     return ret;
1304 }
1305 
1306 
1307 
1308 
1309 
1310 
1311 expublic int exnet_configure(exnetcon_t *net)
1312 {
1313     int ret = EXSUCCEED;
1314     
1315     if (EXSUCCEED!=exnet_addr_get(net))
1316     {
1317         EXFAIL_OUT(ret);
1318     }
1319     
1320     
1321     exnet_add_con(net);
1322     
1323 out:
1324     return ret;
1325 }
1326 
1327 
1328 
1329 
1330 
1331 expublic void exnet_rwlock_read(exnetcon_t *net)
1332 {
1333     if (EXSUCCEED!=pthread_rwlock_rdlock(&(net->rwlock)))
1334     {
1335         int err = errno;
1336         
1337         NDRX_LOG(log_error, "Failed to read lock: %s - exiting", strerror(err));
1338         userlog("Failed to read lock: %s  - exiting", strerror(err));
1339         exit(EXFAIL);
1340     }
1341 }
1342 
1343 
1344 
1345 
1346 
1347 expublic void exnet_rwlock_write(exnetcon_t *net)
1348 {
1349     if (EXSUCCEED!=pthread_rwlock_rdlock(&net->rwlock))
1350     {
1351         int err = errno;
1352         
1353         NDRX_LOG(log_error, "Failed to write lock: %s - exiting", strerror(err));
1354         userlog("Failed to write lock: %s  - exiting", strerror(err));
1355         exit(EXFAIL);
1356     }
1357 }
1358 
1359 
1360 
1361 
1362 
1363 expublic void exnet_rwlock_unlock(exnetcon_t *net)
1364 {
1365     if (EXSUCCEED!=pthread_rwlock_unlock(&net->rwlock))
1366     {
1367         int err = errno;
1368         
1369         NDRX_LOG(log_error, "Failed to unlock rwlock: %s - exiting", strerror(err));
1370         userlog("Failed to unlock rwlock: %s  - exiting", strerror(err));
1371         exit(EXFAIL);
1372     }
1373 }
1374 
1375 
1376 
1377 
1378 
1379 
1380 expublic void exnet_rwlock_mainth_write(exnetcon_t *net)
1381 {
1382     exnet_rwlock_unlock(net);
1383     exnet_rwlock_write(net);
1384 }
1385 
1386 
1387 
1388 
1389 
1390 expublic void exnet_rwlock_mainth_read(exnetcon_t *net)
1391 {
1392     exnet_rwlock_unlock(net);
1393     exnet_rwlock_read(net); 
1394 }
1395 
1396 
1397 
1398 
1399 expublic int exnet_is_connected(exnetcon_t *net)
1400 {
1401     return net->is_connected;
1402 }
1403 
1404 
1405 
1406 
1407 expublic int exnet_close_shut(exnetcon_t *net)
1408 {
1409     if (net->lock_init)
1410     {
1411         exnet_rwlock_mainth_write(net);
1412         close_socket(net);
1413         exnet_rwlock_mainth_read(net);
1414     }
1415 
1416     return EXSUCCEED;
1417 }
1418 
1419 
1420 
1421 
1422 
1423 expublic int exnet_set_timeout(exnetcon_t *net, int timeout)
1424 {
1425     int ret=EXSUCCEED;
1426     struct timeval tv;
1427 
1428     memset(&tv, 0, sizeof(tv));
1429     tv.tv_sec = timeout;
1430 
1431     if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
1432     {
1433         NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
1434                         net->sock, strerror(errno));
1435         EXFAIL_OUT(ret);
1436     }
1437 
1438 out:
1439     return ret;
1440 }
1441 
1442 
1443 
1444 
1445 
1446 expublic void exnet_reset_struct(exnetcon_t *net)
1447 {
1448     memset(net, 0, sizeof(*net));
1449     net->sock = EXFAIL;                 
1450     net->rcvtimeout = EXFAIL;       
1451     net->len_pfx = EXFAIL;              
1452     net->is_server = EXFAIL;        
1453     net->rcvtimeout = ndrx_get_G_atmi_env()->time_out;
1454 }
1455 
1456 
1457 
1458 
1459 
1460 
1461 
1462 expublic int exnet_net_init(exnetcon_t *net)
1463 {
1464     int ret = EXSUCCEED;
1465     int err;
1466     
1467     memset(&(net->rwlock), 0, sizeof(net->rwlock));
1468 
1469     if (EXSUCCEED!=(err=pthread_rwlock_init(&(net->rwlock), NULL)))
1470     {
1471         NDRX_LOG(log_error, "Failed to init rwlock: %s", strerror(err));
1472         userlog("Failed to init rwlock: %s", strerror(err));
1473         EXFAIL_OUT(ret);
1474     }
1475     
1476     MUTEX_VAR_INIT(net->sendlock);
1477     MUTEX_VAR_INIT(net->rcvlock);
1478     MUTEX_VAR_INIT(net->flagslock);
1479     
1480     
1481     ndrx_stopwatch_reset(&net->periodic_stopwatch);
1482     
1483     
1484     if (EXSUCCEED!=(err=pthread_rwlock_rdlock(&(net->rwlock))))
1485     {
1486         userlog("Failed to acquire read lock: %s", strerror(err));
1487         NDRX_LOG(log_error, "Failed to acquire read lock - exiting...: %s",
1488                                                    strerror(err));
1489         exit(EXFAIL);
1490     }
1491     net->lock_init=EXTRUE;
1492             
1493 out:
1494     return ret;
1495 }
1496