Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Enduro/X server net/socket client lib
0003  *   Network object needs to be synchronize otherwise unexpected core dumps might
0004  *   Locking:
0005  *   - main thread always have read lock
0006  *   - sender thread read lock only when doing send
0007  *   - when main thread wants some changes in net object, it waits for write lock
0008  *   - when scheduling the connection close, there is not need to write lock
0009  *     because that read thread sets only EXTRUE (close). The actual close and
0010  *     new socket open will be handled by write lock, thus it shall prevent from
0011  *     accidental setting back to EXTRUE.
0012  *   - the connection object once created will always live in DL list even with
0013  *   with disconnected status. Once new conn arrives, it will re-use the conn obj.
0014  *
0015  * @file exnet.c
0016  */
0017 /* -----------------------------------------------------------------------------
0018  * Enduro/X Middleware Platform for Distributed Transaction Processing
0019  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0020  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0021  * This software is released under one of the following licenses:
0022  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0023  * See LICENSE file for full text.
0024  * -----------------------------------------------------------------------------
0025  * AGPL license:
0026  *
0027  * This program is free software; you can redistribute it and/or modify it under
0028  * the terms of the GNU Affero General Public License, version 3 as published
0029  * by the Free Software Foundation;
0030  *
0031  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0032  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0033  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0034  * for more details.
0035  *
0036  * You should have received a copy of the GNU Affero General Public License along 
0037  * with this program; if not, write to the Free Software Foundation, Inc.,
0038  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0039  *
0040  * -----------------------------------------------------------------------------
0041  * A commercial use license is available from Mavimax, Ltd
0042  * contact@mavimax.com
0043  * -----------------------------------------------------------------------------
0044  */
0045 #include <ndrx_config.h>
0046 #include <sys/socket.h>
0047 #include <sys/time.h>
0048 #include <sys/types.h>
0049 #include <arpa/inet.h>
0050 #include <netinet/in.h>
0051 #include <netinet/tcp.h>
0052 #include <errno.h>
0053 #include <fcntl.h>
0054 #include <netdb.h>
0055 #include <string.h>
0056 #include <unistd.h>
0057 
0058 #include <ndrstandard.h>
0059 
0060 #if defined(EX_USE_EPOLL)
0061 
0062 #include <sys/epoll.h>
0063 
0064 #elif defined(EX_USE_KQUEUE)
0065 
0066 #include <sys/event.h>
0067 
0068 #endif
0069 
0070 #include <poll.h>
0071 
0072 #include <atmi.h>
0073 #include <stdio.h>
0074 #include <stdlib.h>
0075 #include <exnet.h>
0076 #include <ndebug.h>
0077 #include <utlist.h>
0078 
0079 #include <atmi_int.h>
0080 #include <ndrx_config.h>
0081 
0082 /*---------------------------Externs------------------------------------*/
0083 /*---------------------------Macros-------------------------------------*/
0084 #define DBUF_SZ (NDRX_MSGSIZEMAX - net->dl) /**< Buffer size to recv in     */
0085 
0086 #if defined(EX_USE_EPOLL)
0087 
0088 #define POLL_FLAGS (EPOLLIN | EPOLLHUP)
0089 
0090 #elif defined(EX_USE_KQUEUE)
0091 
0092 #define POLL_FLAGS EVFILT_READ
0093 
0094 #else
0095 
0096 #define POLL_FLAGS (POLLIN)
0097 
0098 #endif
0099 
0100 /*---------------------------Enums--------------------------------------*/
0101 /*---------------------------Typedefs-----------------------------------*/
0102 /*---------------------------Globals------------------------------------*/
0103 /*---------------------------Statics------------------------------------*/
0104 
0105 exprivate MUTEX_LOCKDECL(M_send_lock);
0106 exprivate MUTEX_LOCKDECL(M_recv_lock); 
0107 
0108 /*---------------------------Prototypes---------------------------------*/
0109 exprivate int close_socket(exnetcon_t *net);
0110 exprivate int open_socket(exnetcon_t *net);
0111 exprivate ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n, 
0112         int flags, int appflags);
0113 
0114 exprivate int exnet_schedule_run(exnetcon_t *net);
0115 
0116 
0117 /**
0118  * Get delta time when stopwatch was set
0119  * @param net network connection, used for locking
0120  * @param w stop watch to extract value from
0121  * @return stop watch reading
0122  */
0123 expublic long exnet_stopwatch_get_delta_sec(exnetcon_t *net, ndrx_stopwatch_t *w)
0124 {
0125     long ret;
0126     
0127     MUTEX_LOCK_V(net->flagslock);
0128     
0129     ret = ndrx_stopwatch_get_delta_sec(w);
0130     
0131     MUTEX_UNLOCK_V(net->flagslock);
0132     
0133     return ret;
0134 }
0135 
0136 /**
0137  * Reset the stopwatch in locked way
0138  * @param net network which contains the flags lock
0139  * @param w stopwatch to reset
0140  */
0141 expublic void exnet_stopwatch_reset(exnetcon_t *net, ndrx_stopwatch_t *w)
0142 {
0143     MUTEX_LOCK_V(net->flagslock);
0144     
0145     ndrx_stopwatch_reset(w);
0146     
0147     MUTEX_UNLOCK_V(net->flagslock);
0148 }   
0149 
0150 /**
0151  * Send single message, put length in front
0152  * We will send all stuff required, do that in loop!
0153  * @param hdr_buf pre-send some header, to avoid extra mem opy
0154  * @param hdr_len pre-send header lenght
0155  */
0156 expublic int exnet_send_sync(exnetcon_t *net, char *hdr_buf, int hdr_len, 
0157         char *buf, int len, int flags, int appflags)
0158 {
0159     int ret=EXSUCCEED;
0160     int allow_size = DATA_BUF_MAX;
0161     int sent = 0;
0162     /* let to have 128 header size */
0163     char d[NET_LEN_PFX_LEN+128];    /* Data buffer, len             */
0164     int size_to_send, len_whdr, hdr_snd_size = 0;
0165     int tmp_s;
0166     int err;
0167     int retry;
0168     ndrx_stopwatch_t w;
0169 
0170     /* check the sizes are that supported? */
0171     len_whdr = hdr_len + len;
0172     if (len_whdr>allow_size)
0173     {
0174         NDRX_LOG(log_error, "Buffer too large for sending! "
0175                         "requested: %d, allowed: %d", len_whdr, allow_size);
0176         EXFAIL_OUT(ret);
0177     }
0178 
0179     /* !!! Prepare the buffer 
0180      * do we really need a copy???!
0181      * maybe just send 4 bytes and the other bytes...   
0182      
0183     memcpy(d+net->len_pfx, buf, len);
0184     * - moved to two step sending...
0185     */
0186     
0187     if (4==net->len_pfx)
0188     {
0189         /* Install the length prefix. */
0190         d[0] = (len_whdr >> 24) & 0xff;
0191         d[1] = (len_whdr >> 16) & 0xff;
0192         d[2] = (len_whdr >> 8) & 0xff;
0193         d[3] = (len_whdr) & 0xff;
0194         hdr_snd_size=net->len_pfx;
0195     }
0196     
0197     if (NULL!=hdr_buf)
0198     {
0199         /* add hdr to msg */
0200         memcpy(d+net->len_pfx, hdr_buf, hdr_len);
0201         hdr_snd_size+=hdr_len;
0202     }
0203 
0204     size_to_send = len+hdr_snd_size;
0205 
0206     /* Do sending in loop... */
0207     MUTEX_LOCK_V(net->sendlock);
0208     do
0209     {
0210         err = 0;
0211         NDRX_LOG(log_debug, "Sending, len: %d, total msg: %d", 
0212                 size_to_send-sent, size_to_send);
0213         
0214         if (!(appflags & APPFLAGS_MASK))
0215         {
0216             if (sent < hdr_snd_size)
0217             {
0218                 NDRX_DUMP(log_debug, "Sending, msg (msg len pfx)", 
0219                         d+sent, hdr_snd_size-sent);
0220             }
0221             else
0222             {
0223                 NDRX_DUMP(log_debug, "Sending, msg ", buf+sent-hdr_snd_size, 
0224                             size_to_send-sent);
0225             }
0226         }
0227         else
0228         {
0229             NDRX_LOG(log_debug, "*** MSG DUMP IS MASKED ***");
0230         }
0231         
0232         ndrx_stopwatch_reset(&w);
0233         
0234         do
0235         {
0236             err = 0;
0237             retry = EXFALSE;
0238             
0239             if (sent<hdr_snd_size)
0240             {
0241                 tmp_s = send(net->sock, d+sent, hdr_snd_size-sent, flags);
0242             }
0243             else
0244             {
0245                 /* WARNING ! THIS MIGHT GENERATE SIGPIPE */
0246                 tmp_s = send(net->sock, buf+sent-hdr_snd_size, 
0247                         size_to_send-sent, flags);
0248             }
0249             
0250             if (EXFAIL==tmp_s)
0251             {
0252                 err = errno;
0253             }
0254             else
0255             {
0256                 /* something is sent..., thus reset sent stopwatch */
0257                 exnet_stopwatch_reset(net, &net->last_snd);
0258             }
0259             
0260             if (EAGAIN==err || EWOULDBLOCK==err)
0261             {
0262                 int spent = ndrx_stopwatch_get_delta_sec(&w);
0263         int rcvtim = net->rcvtimeout - spent;
0264         struct pollfd ufd;
0265 
0266         memset(&ufd, 0, sizeof ufd);
0267 
0268                 NDRX_LOG(log_warn, "Socket full: %s - retry, "
0269                         "time spent: %d, max: %d - POLLOUT (rcvtim=%d) sent: %d tot: %d",
0270                         strerror(err), spent, net->rcvtimeout, rcvtim, sent, size_to_send);
0271 
0272         ufd.fd = net->sock; /* poll the read fd after the write fd is closed */
0273         ufd.events = POLLOUT;
0274 
0275         if (rcvtim < 1 || poll(&ufd, 1, rcvtim * 1000) < 0 || ufd.revents & POLLERR)
0276         {
0277                     NDRX_LOG(log_error, "ERROR! Failed to send, socket full: %s "
0278                             "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)", 
0279                         strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0280                             (ufd.revents & POLLERR));
0281                     
0282                     userlog("ERROR! Failed to send, socket full: %s "
0283                             "time spent: %d, max: %d short: %hd rcvtim: %d (POLLERR: %d)",
0284                             strerror(err), spent, net->rcvtimeout, ufd.revents, rcvtim,
0285                             (ufd.revents & POLLERR));
0286                     
0287                     net->schedule_close = EXTRUE;
0288                     ret=EXFAIL;
0289                     goto out_unlock;
0290                 }
0291                 
0292                 retry = EXTRUE;
0293                 
0294             }
0295         }
0296         while (retry);
0297 
0298         if (EXFAIL==tmp_s)
0299         {
0300             NDRX_LOG(log_error, "send failure: %s",
0301                             strerror(err));
0302             /* close_socket(net); */
0303             
0304             NDRX_LOG(log_error, "Scheduling connection close...");
0305             net->schedule_close = EXTRUE;
0306             ret=EXFAIL;
0307             break;
0308         }
0309         else
0310         {
0311             NDRX_LOG(log_debug, "Sent %d bytes", tmp_s);
0312                         /* We should have sent something */
0313             sent+=tmp_s;
0314         }
0315 
0316         if (sent < size_to_send)
0317         {
0318             NDRX_LOG(log_debug, "partial submission: total: %d, sent: %d, left "
0319                     "for sending: %d - continue", size_to_send, sent, 
0320                     size_to_send - sent);
0321         }
0322         
0323     } 
0324     while (EXSUCCEED==ret && sent < size_to_send);
0325 
0326 out_unlock:
0327     MUTEX_UNLOCK_V(net->sendlock);
0328 
0329 out:
0330     
0331     return ret;
0332 }
0333 
0334 /**
0335  * Internal version of receive.
0336  * On error, it will do disconnect!
0337  * Receive is done by main thread only. Thus no we can close connection here directly.
0338  */
0339 exprivate  ssize_t recv_wrap (exnetcon_t *net, void *__buf, size_t __n, int flags, int appflags)
0340 {
0341     ssize_t ret;
0342 
0343     ret = recv (net->sock, __buf, __n, flags);
0344 
0345     if (0==ret)
0346     {
0347         NDRX_LOG(log_error, "Disconnect received - schedule close!");
0348         
0349         net->schedule_close = EXTRUE;
0350         ret=EXFAIL;
0351         goto out;
0352     }
0353     else if (EXFAIL==ret)
0354     {
0355         if (EAGAIN==errno || EWOULDBLOCK==errno)
0356         {
0357             NDRX_LOG(log_info, "Still no data (waiting...)");
0358         }
0359         else
0360         {
0361             NDRX_LOG(log_error, "recv failure: %s - schedule close", 
0362                     strerror(errno));
0363             
0364             net->schedule_close = EXTRUE;
0365         }
0366 
0367         ret=EXFAIL;
0368         goto out;
0369     }
0370     else
0371     {
0372         /* msg received */
0373         exnet_stopwatch_reset(net, &net->last_rcv);
0374     }
0375     
0376 out:
0377     return ret;
0378 }
0379 
0380 /**
0381  * Get the length of message currently buffered.
0382  * We operate it little endian mode
0383  */
0384 exprivate int get_full_len(exnetcon_t *net)
0385 {
0386     int  pfx_len, msg_len;
0387 
0388     /* 4 bytes... */
0389     pfx_len = ( 
0390                 (net->d[0] & 0xff) << 24
0391               | (net->d[1] & 0xff) << 16
0392               | (net->d[2] & 0xff) << 8
0393               | (net->d[3] & 0xff)
0394             );
0395 
0396     msg_len = pfx_len+net->len_pfx;
0397     NDRX_LOG(log_debug, "pfx_len=%d msg_len=%d", pfx_len, msg_len);
0398     
0399     /* TODO: if we got invalid len - close connection */
0400 
0401     return msg_len;
0402 }
0403 
0404 /**
0405  * Receive single message with prefixed length.
0406  * We will check peek the length bytes, and then
0407  * will request to receive full message
0408  * If we do poll on incoming messages, we shall receive from the same thread
0409  * because, otherwise it will alert for messages all the time or we need to
0410  * check some advanced flags..
0411  * On error we can close connection directly, because receive is done by main 
0412  * thread.
0413  * @return EXSUCCEED full msg received, EXFAIL no full msg 
0414  */
0415 expublic int exnet_recv_sync(exnetcon_t *net, char **buf, int *len, int flags, int appflags)
0416 {
0417     int ret=EXSUCCEED;
0418     int got_len;
0419     int full_msg;
0420     int download_size;
0421     
0422     /* Lock the stuff... */
0423     MUTEX_LOCK_V(net->rcvlock);
0424     
0425     if (0==net->dl)
0426     {
0427         /* This is new message */
0428         ndrx_stopwatch_reset(&net->rcv_timer);
0429     }
0430     
0431     if (NULL==net->dlsysbuf)
0432     {
0433         size_t tmp_buf_len;
0434         /* alloc the buffer... */
0435         NDRX_SYSBUF_MALLOC_OUT(net->dlsysbuf, tmp_buf_len, ret);
0436     }
0437     
0438     while (EXSUCCEED==ret)
0439     {
0440         /* Either we will timeout, or return by cut_out_msg! */
0441         if (net->dl>=net->len_pfx)
0442         {
0443             full_msg = get_full_len(net);
0444             
0445             if (full_msg < 0)
0446             {
0447                 NDRX_LOG(log_error, "ERROR ! received len %d < 0 - closing socket!", 
0448                         full_msg);
0449                 userlog("ERROR ! received len %d < 0! - schedule close socket!", full_msg);
0450                 net->schedule_close = EXTRUE;
0451                 ret=EXFAIL;
0452                 break;
0453             }
0454             else if (full_msg > DATA_BUF_MAX)
0455             {
0456                 NDRX_LOG(log_error, "ERROR ! received len %d > max buf %ld! - "
0457                         "closing socket!", full_msg, DATA_BUF_MAX);
0458                 userlog("ERROR ! received len %d > max buf %ld! - "
0459                         "closing socket!", full_msg, DATA_BUF_MAX);
0460                 net->schedule_close = EXTRUE;
0461                 ret=EXFAIL;
0462                 break;
0463             }
0464 
0465             NDRX_LOG(log_debug, "Data buffered - "
0466                     "buf: [%d], full: %d",
0467                     net->dl, full_msg);
0468             if (net->dl >= full_msg)
0469             {
0470                 /* Copy msg out there & cut the buffer */
0471                 /*
0472                 ret=cut_out_msg(net, full_msg, buf, len, appflags);
0473                  * 
0474                  */
0475                 /* return the msg */
0476                 *len = full_msg-net->len_pfx;
0477                 net->dl -= full_msg;
0478                 *buf=net->dlsysbuf;
0479                 net->dlsysbuf=NULL;
0480                 /* should be 0! */
0481                 
0482                 if (0!=net->dl)
0483                 {
0484                     NDRX_LOG(log_error, "Expected left over 0 but got %d", net->dl);
0485                     userlog("Expected left over 0 but got %d", net->dl);
0486                     abort();
0487                 }
0488                 
0489                 if (0==*len)
0490                 {
0491                     NDRX_LOG(log_debug, "zero length message - ignore!");
0492                     ret=EXFAIL;
0493                 }
0494                 else if (!(appflags & APPFLAGS_MASK))
0495                 {
0496                     NDRX_DUMP(log_debug, "Got message: ", *buf, *len);
0497                 }
0498                 
0499                 MUTEX_UNLOCK_V(net->rcvlock);
0500                 return ret;
0501             }
0502         }
0503 
0504         NDRX_LOG(log_debug, "Data needs to be received, dl=%d", net->dl);
0505         
0506         /* well, lets receive only the message bytes,
0507          * and try to not receive any buffered stuff,
0508          * that would cause lots of memmove for large downloaded messages...
0509          */
0510         
0511         if (net->dl < net->len_pfx)
0512         {
0513             /* read some length + msg bytes... */
0514             download_size = net->len_pfx - net->dl;
0515         }
0516         else
0517         {
0518             /* OK we have full_msg, thus calculate the download size... */
0519             download_size = full_msg - net->dl;
0520         }
0521         
0522         /* we shall not get the  download_size < 0, this means data is buffered
0523          * but for some reason we are not processing them...
0524          * also download_size shall bigger than DBUF_SZ
0525          */
0526         
0527         if (download_size < 0)
0528         {
0529             NDRX_LOG(log_error, "ERROR ! Expected download size < 0 (%d)", download_size);
0530             userlog("ERROR ! Expected download size < 0 (%d)", download_size);
0531             net->schedule_close = EXTRUE;
0532             ret=EXFAIL;
0533             break;
0534         }
0535         else if (download_size > DBUF_SZ)
0536         {
0537             NDRX_LOG(log_error, "ERROR ! Expected download size bigger "
0538                     "than buffer left: %d > %d", download_size, (DBUF_SZ));
0539             userlog("ERROR ! Expected download size bigger "
0540                     "than buffer left: %d > %d", download_size, (DBUF_SZ));
0541             
0542             /* write lock? maybe not, as schedule_close will be set to EXFALSE
0543              * only after connection rest. Thus some race condition on setting
0544              * to EXTRUE, wont cause a problem.
0545              */
0546             net->schedule_close = EXTRUE;
0547             ret=EXFAIL;
0548             break;
0549         }
0550         
0551         /* 
0552          * select target buffer, either prefix receiving buf
0553          * our data receiving buf from the caller - to avoid extra memcopy....
0554          */
0555         if (net->dl < net->len_pfx)
0556         {
0557             if (EXFAIL==(got_len=recv_wrap(net, net->d+net->dl, download_size, 
0558                     flags, appflags)))
0559             {
0560                 /* NDRX_LOG(log_error, "Failed to get data");*/
0561                 ret=EXFAIL;
0562             }
0563             else
0564             {
0565                 if (!(appflags&APPFLAGS_MASK))
0566                 {
0567                     NDRX_DUMP(log_debug, "Got packet: ",
0568                             net->d+net->dl, got_len);
0569                 }
0570                 net->dl+=got_len;
0571             }
0572         }
0573         else
0574         {
0575             if (EXFAIL==(got_len=recv_wrap(net, net->dlsysbuf+net->dl-net->len_pfx, 
0576                     download_size, flags, appflags)))
0577             {
0578                 /* NDRX_LOG(log_error, "Failed to get data");*/
0579                 ret=EXFAIL;
0580             }
0581             else
0582             {
0583                 if (!(appflags&APPFLAGS_MASK))
0584                 {
0585                     NDRX_DUMP(log_debug, "Got packet: ",
0586                             net->dlsysbuf+net->dl-net->len_pfx, got_len);
0587                 }
0588                 net->dl+=got_len;
0589             }
0590         }
0591         
0592     }
0593     
0594     /* If message is not complete & there is timeout condition, 
0595      * then close conn & fail
0596      */
0597     /* well shouldn't this be write locked? */
0598     if (!net->schedule_close && 
0599             ndrx_stopwatch_get_delta_sec(&net->rcv_timer) >=net->rcvtimeout )
0600     {
0601         NDRX_LOG(log_error, "This is time-out => schedule close socket !");
0602         net->schedule_close = EXTRUE;
0603     }
0604 
0605 out:    
0606     MUTEX_UNLOCK_V(net->rcvlock);
0607     
0608     /* We should fail anyway, because no message received, yet! */
0609     ret=EXFAIL;
0610 
0611     return ret;
0612 }
0613 
0614 /**
0615  * Run stuff before going to poll.
0616  * We might have a buffered message to process!
0617  */
0618 expublic int exnet_b4_poll_cb(void)
0619 {
0620     int ret=EXSUCCEED;
0621     exnetcon_t *head = extnet_get_con_head();
0622     exnetcon_t *net, *tmp;
0623     
0624     DL_FOREACH_SAFE(head, net, tmp)
0625     {
0626         if (exnet_schedule_run(net))
0627         {
0628             continue;
0629         }
0630     }
0631 
0632 out:
0633     return ret;
0634 }
0635 
0636 /**
0637  * Poll have detected activity on FD
0638  */
0639 expublic int exnet_poll_cb(int fd, uint32_t events, void *ptr1)
0640 {
0641     int ret;
0642     int so_error=0;
0643     socklen_t len = sizeof so_error;
0644     exnetcon_t *net = (exnetcon_t *)ptr1;   /* Get the connection ptr... */
0645     /* char buf[DATA_BUF_MAX]; */
0646     int buflen = 0;
0647     char *buf = NULL;
0648     
0649     /* check schedule... */
0650     if (exnet_schedule_run(net))
0651     {
0652         goto out;
0653     }
0654     
0655     /* sysbuf alloc  - moved to recv...
0656     NDRX_SYSBUF_MALLOC_OUT(buf, NULL, ret);
0657      * */
0658 
0659     /* Receive the event of the socket */
0660     if (EXSUCCEED!=getsockopt(net->sock, SOL_SOCKET, SO_ERROR, &so_error, &len))
0661     {
0662         NDRX_LOG(log_error, "Failed go get getsockopt: %s",
0663                         strerror(errno));
0664         ret=EXFAIL;
0665         goto out;
0666     }
0667 
0668     if (0==so_error && !net->is_connected
0669 
0670 #ifndef  EX_USE_KQUEUE
0671     && events /* for Kqueue looks like no event is ok! */
0672 #endif
0673     )
0674     {
0675 
0676         /* RW Lock! */
0677         
0678         exnet_rwlock_mainth_write(net);
0679         EXNET_CONNECTED(net);
0680         exnet_rwlock_mainth_read(net);
0681         
0682         NDRX_LOG(log_warn, "Connection is now open!");
0683 
0684         /* Call custom callback, if there is such */
0685         if (NULL!=net->p_connected && EXSUCCEED!=net->p_connected(net))
0686         {
0687             NDRX_LOG(log_error, "Connected notification "
0688                             "callback failed!");
0689             ret=EXFAIL;
0690             goto out;
0691         }
0692 
0693     }
0694 
0695     if (0==so_error && !net->is_connected && 
0696             ndrx_stopwatch_get_delta_sec(&net->connect_time) > net->rcvtimeout)
0697     {
0698         NDRX_LOG(log_error, "Cannot establish connection to server in "
0699                 "time: %ld secs", ndrx_stopwatch_get_delta_sec(&net->connect_time));
0700         
0701         net->schedule_close = EXTRUE;
0702         
0703         goto out;
0704     }
0705     else if (0!=so_error)
0706     {
0707         if (!net->is_connected)
0708         {
0709             NDRX_LOG(log_error, "Failed to connect to server: %s",
0710                             strerror(so_error));
0711         }
0712         else
0713         {
0714             NDRX_LOG(log_error, "Socket client failed: %s",
0715                             strerror(so_error));
0716         }
0717 
0718         if (EINPROGRESS!=errno)
0719         {
0720             net->schedule_close = EXTRUE;
0721             
0722             /* Do not send fail to NDRX */
0723             goto out;
0724         }
0725     }
0726     else if (net->is_connected)
0727     {
0728         long rcvt;
0729         
0730         
0731         /* TODO: This shall be moved to main thread
0732          * as sending is not part of the receiving...
0733          * and shall be periodic
0734          */
0735         
0736         /* We are connected, send zero length message, ok 
0737          * Firstly: sending shall be done by worker thread
0738          * Secondly: send only in case if there was no data sent over the socket
0739          * - We need timer for incoming data, in case if data not received
0740          *  for twice of "periodic_zero" period, the connection shall be reset.
0741          */
0742         if (net->periodic_zero && 
0743                 exnet_stopwatch_get_delta_sec(net, &net->last_snd) > net->periodic_zero)
0744         {
0745             NDRX_LOG(log_info, "About to issue zero length "
0746                     "message on fd %d", net->sock);
0747 
0748             /* Submit the job for sending to sender thread,
0749              * so that we do not lock-up the dispatching thread..
0750              */
0751             net->p_snd_zero_len(net);
0752         }
0753         
0754         if (net->p_snd_clock_sync && net->periodic_clock_time && 
0755                 ndrx_stopwatch_get_delta_sec(&net->periodic_stopwatch) > net->periodic_clock_time)
0756         {
0757             NDRX_LOG(log_info, "About to issue clock sync "
0758                     "message on fd %d", net->sock);
0759             net->p_snd_clock_sync(net);
0760             
0761             /* reset the stopwatch... */
0762             ndrx_stopwatch_reset(&net->periodic_stopwatch);
0763         }
0764         
0765         if (net->recv_activity_timeout && 
0766                 (rcvt=exnet_stopwatch_get_delta_sec(net, &net->last_rcv)) > net->recv_activity_timeout)
0767         {
0768             NDRX_LOG(log_error, "No data received in %ld sec (max with out data: %d) "
0769                     "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0770             userlog("No data received in %ld sec (max with out data: %d) "
0771                     "reset soc/fd=%d", rcvt, net->recv_activity_timeout, net->sock);
0772             net->schedule_close = EXTRUE;
0773         }
0774     }
0775 
0776     /* Hmm try to receive something? */
0777 #if defined(EX_USE_EPOLL)
0778     
0779     if (events & EPOLLIN)
0780         
0781 #elif defined(EX_USE_KQUEUE)
0782         
0783     if (1) /* Process all events... of Kqueue */
0784         
0785 #else
0786     if (events & POLLIN)
0787 #endif
0788     {
0789         /* NDRX_LOG(6, "events & EPOLLIN => call exnet_recv_sync()"); */
0790 /*        while(EXSUCCEED == exnet_recv_sync(net, buf, &buflen, 0, 0))*/
0791         if(EXSUCCEED == exnet_recv_sync(net, &buf, &buflen, 0, 0))
0792         {
0793             /* We got the message - do the callback op */
0794             net->p_process_msg(net, &buf, buflen);
0795         }
0796     }
0797 
0798 out:
0799 
0800     /* remove the buffer if haven't already zapped by threads */
0801     if (NULL!=buf)
0802     {
0803         NDRX_SYSBUF_FREE(buf);
0804     }
0805 
0806     return EXSUCCEED;
0807 }
0808 
0809 /**
0810  * Close socket
0811  * This shall be called in write lock mode!
0812  */
0813 exprivate int close_socket(exnetcon_t *net)
0814 {
0815     int ret=EXSUCCEED;
0816 
0817     NDRX_LOG(log_warn, "Closing socket %d...", net->sock);
0818     net->dl = 0; /* Reset buffered bytes */
0819     
0820     net->is_connected=EXFALSE; /* mark disconnected. */
0821     
0822     if (EXFAIL!=net->sock)
0823     {
0824         /* Remove from polling structures */
0825         if (EXSUCCEED!=tpext_delpollerfd(net->sock))
0826         {
0827             NDRX_LOG(log_error, "Failed to remove polling extension: %s",
0828                                 tpstrerror(tperrno));
0829         }
0830 
0831         /* shutdown & Close it */
0832         if (EXSUCCEED!=shutdown(net->sock, SHUT_RDWR))
0833         {
0834             NDRX_LOG(log_error, "Failed to shutdown socket: %s",
0835                                     strerror(errno));
0836         }
0837 
0838         if (EXSUCCEED!=close(net->sock))
0839         {
0840             NDRX_LOG(log_error, "Failed to close socket: %s",
0841                                     strerror(errno));
0842             ret=EXFAIL;
0843             goto out;
0844         }
0845     }
0846 
0847 out:
0848     net->sock=EXFAIL;
0849     net->schedule_close = EXFALSE;
0850     
0851     if (NULL!=net->p_disconnected && EXSUCCEED!=net->p_disconnected(net))
0852     {
0853         NDRX_LOG(log_error, "Disconnected notification "
0854                         "callback failed!");
0855         ret=EXFAIL;
0856     }
0857 #if 0
0858     /* Remove it from linked list, if it is incoming connection. */
0859     if (net->is_incoming)
0860     {
0861         exnet_del_con(net);
0862         /* If this was incoming, then do some server side work + do free as it did malloc! */
0863         exnet_remove_incoming(net);
0864     }
0865 #endif
0866     
0867     if (net->is_incoming)
0868     {
0869         exnet_remove_incoming(net);
0870     }
0871     
0872     return ret;
0873 }
0874 
0875 /**
0876  * Configure socket / set standard options
0877  * @param net client/server handler (including listener)
0878  * @return EXSUCCEED/EXFAIL
0879  */
0880 expublic int exnet_configure_setopts(exnetcon_t *net)
0881 {
0882     int ret=EXSUCCEED;
0883     struct timeval tv;
0884     int flag = 1;
0885     int result;
0886     struct linger l;
0887     int enable = EXTRUE;
0888 
0889     /* We want to poll the stuff */
0890     if (EXFAIL==fcntl(net->sock, F_SETFL, O_NONBLOCK))
0891     {
0892         NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0893                                 strerror(errno));
0894         EXFAIL_OUT(ret);
0895     }
0896     
0897     if (setsockopt(net->sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
0898     {
0899         NDRX_LOG(log_error, "Failed to set SO_REUSEADDR: %s", strerror(errno));
0900         EXFAIL_OUT(ret);
0901     }
0902     
0903 
0904     if (EXFAIL==(result = setsockopt(net->sock,            /* socket affected */
0905                             IPPROTO_TCP,     /* set option at TCP level */
0906                             TCP_NODELAY,     /* name of option */
0907                             (char *) &flag,  /* the cast is historical
0908                                                     cruft */
0909                             sizeof(int))))    /* length of option value */
0910     {
0911 
0912         NDRX_LOG(log_error, "Failed set socket non blocking!: %s",
0913                                 strerror(errno));
0914         EXFAIL_OUT(ret);
0915     }
0916     
0917     /* set no linger */
0918     l.l_onoff=0;
0919     l.l_linger=0;
0920     if(setsockopt(net->sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) < 0)
0921     {
0922         NDRX_LOG(log_error, "Failed to set SO_LINGER: %s", strerror(errno));
0923         EXFAIL_OUT(ret);
0924     }
0925     
0926 
0927 /* this is not workin on solaris 10... */
0928 #if EX_OS_SUNOS && EX_LSB_RELEASE_VER_MAJOR == 5 && EX_LSB_RELEASE_VER_MINOR == 10
0929     NDRX_LOG(log_warn, "Solaris 10 - SO_RCVTIMEO not suppported.");
0930 #else
0931     memset(&tv, 0, sizeof(tv));
0932     tv.tv_sec = net->rcvtimeout;
0933     NDRX_LOG(log_debug, "Setting SO_RCVTIMEO=%d", tv.tv_sec);
0934     if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
0935     {
0936         NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
0937                         net->sock, strerror(errno));
0938         ret=EXFAIL;
0939         goto out;
0940     }
0941 #endif
0942     
0943 out:
0944     return ret;
0945 }
0946 
0947 /**
0948  * Extract port number
0949  * @param sa socket address 
0950  * @return port number
0951  */
0952 expublic in_port_t exnet_get_port(struct sockaddr *sa)
0953 {
0954     if (AF_INET == sa->sa_family)
0955     {
0956         return ntohs(((struct sockaddr_in*)sa)->sin_port);
0957     }
0958     else
0959     {
0960         return ntohs(((struct sockaddr_in6*)sa)->sin6_port);
0961     }
0962 }
0963 
0964 /**
0965  * Open socket
0966  */
0967 exprivate int open_socket(exnetcon_t *net)
0968 {
0969     int ret=EXSUCCEED;
0970     int err;
0971     char ip[(INET6_ADDRSTRLEN)*2];
0972     
0973     /* Try to connect! */
0974     net->is_connected=EXFALSE;
0975 
0976     net->sock = socket(net->addr_cur->ai_family, 
0977             SOCK_STREAM, net->addr_cur->ai_protocol);
0978 
0979     /* Create socket for listening */
0980     if (EXFAIL==net->sock)
0981     {
0982         NDRX_LOG(log_error, "Failed to create socket: %s",
0983                                 strerror(errno));
0984         ret=EXFAIL;
0985         goto out;
0986     }
0987     
0988     /* Configure socket */
0989     if (EXSUCCEED!=exnet_configure_setopts(net))
0990     {
0991         EXFAIL_OUT(ret);
0992     }
0993 
0994     if (NULL!=inet_ntop (net->addr_cur->ai_family, 
0995             &((struct sockaddr_in *)net->addr_cur->ai_addr)->sin_addr, ip, sizeof(ip)))
0996     {
0997         NDRX_LOG(log_info,"Trying to connect to IPv%d address: %s port: %d", 
0998                 net->addr_cur->ai_family == PF_INET6 ? 6 : 4, ip, 
0999                 (int)exnet_get_port(net->addr_cur->ai_addr));
1000     }
1001     else
1002     {
1003         NDRX_LOG(log_error, "Failed to extract address info: %s", strerror(errno));
1004     }
1005 
1006     if (EXSUCCEED!=connect(net->sock, net->addr_cur->ai_addr, 
1007             net->addr_cur->ai_addrlen))
1008     {
1009         err=errno;
1010         NDRX_LOG(log_error, "connect() failed for fd=%d: %d/%s",
1011                         net->sock, err, strerror(err));
1012         
1013         if (ENETUNREACH==err || ECONNREFUSED==err)
1014         {
1015             NDRX_LOG(log_error, "Try later to connect -> next ip");
1016             close(net->sock);
1017             net->sock=EXFAIL;
1018             goto out;
1019         }
1020         if (EINPROGRESS!=err)
1021         {
1022             ret=EXFAIL;
1023             goto out;
1024         }
1025     }
1026     /* Take the time on what we try to connect. */
1027     ndrx_stopwatch_reset(&net->connect_time);
1028 
1029     /* Add stuff for polling */
1030     if (EXSUCCEED!=tpext_addpollerfd(net->sock,
1031             POLL_FLAGS,
1032             (void *)net, exnet_poll_cb))
1033     {
1034         NDRX_LOG(log_error, "tpext_addpollerfd failed!");
1035         ret=EXFAIL;
1036         goto out;
1037     }
1038     
1039 out:
1040     return ret;
1041 }
1042 
1043 /**
1044  * Close connection
1045  * @param net network object
1046  * @return EXTRUE -> Connection removed, EXFALSE -> connection not removed
1047  */
1048 exprivate int exnet_schedule_run(exnetcon_t *net)
1049 {
1050     int is_incoming;
1051     
1052     if (net->schedule_close)
1053     {
1054         NDRX_LOG(log_warn, "Connection close is scheduled - "
1055                 "closing fd %d is_incoming %d", 
1056                 net->sock, net->is_incoming);
1057         is_incoming=net->is_incoming;
1058         
1059         exnet_rwlock_mainth_write(net);
1060         close_socket(net);
1061         exnet_rwlock_mainth_read(net);
1062 
1063         /* if incoming, continue.. */
1064         if (is_incoming)
1065         {
1066             /* remove connection 
1067             DL_DELETE(M_netlist, net);
1068             NDRX_FREE(net);*/
1069             return EXTRUE;
1070         }
1071     }
1072     
1073     return EXFALSE;
1074 }
1075 
1076 /**
1077  * Open socket is one is closed/FAIL.
1078  * This is periodic callback to the library from
1079  * NDRX polling extension
1080  */
1081 expublic int exnet_periodic(void)
1082 {
1083     int ret=EXSUCCEED;
1084     exnetcon_t *head = extnet_get_con_head();
1085     exnetcon_t *net, *tmp;
1086    
1087     DL_FOREACH_SAFE(head, net, tmp)
1088     {
1089         /* Check if close is scheduled... */
1090         if (exnet_schedule_run(net))
1091         {
1092             continue;
1093         }
1094         
1095         /* Only on connections... */
1096         if (EXFAIL==net->sock)
1097         {
1098             if (net->is_server)
1099             {
1100                 /* if bind failed / try next address... */
1101                 if (EXSUCCEED!=exnet_addr_next(net))
1102                 {
1103                     NDRX_LOG(log_error, "Failed to resolve next binding address!");
1104                     EXFAIL_OUT(ret);
1105                 }
1106                 
1107                 /* Server should bind at this point */
1108                 ret = exnet_bind(net);
1109                 
1110             }
1111             else if (!net->is_incoming)
1112             {
1113                 if (EXSUCCEED!=exnet_addr_next(net))
1114                 {
1115                     NDRX_LOG(log_error, "Failed to resolve next connect address!");
1116                     EXFAIL_OUT(ret);
1117                 }
1118                 /* Client should open socket at this point. */
1119                 ret = open_socket(net);
1120             }
1121         }
1122         else if (!net->is_server)
1123         {
1124             /* Check connection.... */
1125             ret=exnet_poll_cb(net->sock, 0, (void *)net);
1126         }
1127     }
1128 
1129 out:
1130     return ret;
1131 }
1132 
1133 /**
1134  * Set incoming message processing callback
1135  * @param p_process_msg     callback on message received.
1136  * @param p_connected   called when we are connected
1137  * @param p_disconnected    called when we are disconnected
1138  * @param p_send_zero_len callback func for zero length msg sending
1139  *  (normally via thread pool)
1140  * @param p_snd_clock_sync periodic clock sync msg callback
1141  * @return EXSUCCEED 
1142 */
1143 expublic int exnet_install_cb(exnetcon_t *net, int (*p_process_msg)(exnetcon_t *net, char **buf, int len),
1144         int (*p_connected)(exnetcon_t *net), int (*p_disconnected)(exnetcon_t *net),
1145                 int (*p_snd_zero_len)(exnetcon_t *net), int (*p_snd_clock_sync)(exnetcon_t *net))
1146 {
1147     int ret=EXSUCCEED;
1148 
1149     net->p_process_msg = p_process_msg;
1150     net->p_connected = p_connected;
1151     net->p_disconnected = p_disconnected;
1152     net->p_snd_zero_len = p_snd_zero_len;
1153     net->p_snd_clock_sync = p_snd_clock_sync;
1154 
1155 out:
1156     return ret;
1157 }
1158 /**
1159  * Destroy connection handler (free up resources)
1160  * This assumes that connection is closed
1161  * @param net handler to destroy/free up
1162  */
1163 expublic void exnet_unconfigure(exnetcon_t *net)
1164 {
1165     if (NULL!=net->addrinfos)
1166     {
1167         freeaddrinfo(net->addrinfos);
1168         net->addrinfos=NULL;
1169     }
1170     
1171     net->addr_cur=NULL;
1172 }
1173 
1174 /**
1175  * Get list of addresses...
1176  * @param net network config
1177  * @return EXSUCCEED/EXFAIL
1178  */
1179 expublic int exnet_addr_get(exnetcon_t *net)
1180 {
1181     int ret=EXSUCCEED;
1182     struct addrinfo hints;
1183     struct addrinfo *iter;
1184     char ip[(INET6_ADDRSTRLEN)*2];
1185     
1186     /* delete dynamic data... */
1187     exnet_unconfigure(net);
1188     
1189     if (!net->is_server)
1190     {
1191         NDRX_LOG(log_error, "EXNET: client for: %s:%s", net->addr, net->port);
1192     }
1193     else
1194     {
1195         NDRX_LOG(log_error, "EXNET: server for: %s:%s", net->addr, net->port);
1196     }
1197     
1198     /* Resolve address... */
1199     memset(&hints, 0, sizeof(struct addrinfo));
1200     
1201     if (net->is_ipv6)
1202     {
1203         hints.ai_family = AF_INET6;    /* Default Allow IPv4 */
1204     }
1205     else
1206     {
1207         hints.ai_family = AF_INET;    /* Default Allow IPv4 */
1208     }
1209     
1210     hints.ai_socktype = SOCK_STREAM; /* TCP Socket socket */
1211     
1212     if (net->is_server)
1213     {
1214         hints.ai_flags = AI_PASSIVE;
1215     }
1216     else
1217     {
1218         hints.ai_flags = 0;
1219     }
1220     
1221     /* allow numeric IP */
1222     if (net->is_numeric)
1223     {
1224         hints.ai_flags|=AI_NUMERICHOST;
1225     }
1226     
1227     hints.ai_protocol = 0; /* Any protocol */
1228 
1229     ret = getaddrinfo(net->addr, net->port, &hints, &net->addrinfos);
1230     
1231     if (ret != EXSUCCEED)
1232     {
1233         NDRX_LOG(log_error, "Failed to resolve -i addr: getaddrinfo(): %s", 
1234                 gai_strerror(ret));
1235         EXFAIL_OUT(ret);
1236     }
1237     
1238     /* print all resolved addresses... */
1239     for(iter=net->addrinfos; iter != NULL;iter=iter->ai_next)
1240     {
1241         if (NULL!=inet_ntop (iter->ai_family, 
1242             &((struct sockaddr_in *)iter->ai_addr)->sin_addr, ip, sizeof(ip)))
1243         {
1244             NDRX_LOG(log_info,"Resolved: IPv%d address: %s port: %d", 
1245                     iter->ai_family == PF_INET6 ? 6 : 4, ip, 
1246                     (int)exnet_get_port(iter->ai_addr));
1247         }
1248         else
1249         {
1250             NDRX_LOG(log_error, "Failed to get addr info: %s", strerror(errno));
1251         }
1252     }
1253     
1254 out:
1255     return ret;
1256 }
1257 
1258 /**
1259  * Select next address..
1260  * If we are at the end of address selection,
1261  * query addresses again...
1262  * 
1263  * @param net exnet_configure()
1264  * @return 
1265  */
1266 expublic int exnet_addr_next(exnetcon_t *net)
1267 {
1268     int ret = EXSUCCEED;
1269     
1270     if (NULL==net->addr_cur)
1271     {
1272         net->addr_cur=net->addrinfos;
1273     }
1274     else
1275     {
1276         net->addr_cur=net->addr_cur->ai_next;
1277     }
1278     
1279     if (NULL==net->addr_cur)
1280     {
1281         NDRX_LOG(log_warn, "Reload addresses");
1282         
1283         ret=exnet_addr_get(net);
1284         
1285         if (EXSUCCEED!=ret)
1286         {
1287             NDRX_LOG(log_error, "Failed to resolve bind/connect addresses!");
1288             EXFAIL_OUT(ret);
1289         }
1290         
1291         net->addr_cur=net->addrinfos;
1292     }
1293     
1294     if (NULL==net->addr_cur)
1295     {
1296         NDRX_LOG(log_error, "NULL Address found");
1297         EXFAIL_OUT(ret);
1298     }
1299     
1300 out:
1301     NDRX_LOG(log_error, "exnet_addr_next returns %d", ret);
1302 
1303     return ret;
1304 }
1305 
1306 /**
1307  * Configure the library
1308  * @param net values for the socket must be pre-loaded from caller.
1309  * @return EXSUCCEED/EXFAIL
1310  */
1311 expublic int exnet_configure(exnetcon_t *net)
1312 {
1313     int ret = EXSUCCEED;
1314     
1315     if (EXSUCCEED!=exnet_addr_get(net))
1316     {
1317         EXFAIL_OUT(ret);
1318     }
1319     
1320     /* Add stuff to the list of connections */
1321     exnet_add_con(net);
1322     
1323 out:
1324     return ret;
1325 }
1326 
1327 /**
1328  * Make read lock
1329  * @param net network object
1330  */
1331 expublic void exnet_rwlock_read(exnetcon_t *net)
1332 {
1333     if (EXSUCCEED!=pthread_rwlock_rdlock(&(net->rwlock)))
1334     {
1335         int err = errno;
1336         
1337         NDRX_LOG(log_error, "Failed to read lock: %s - exiting", strerror(err));
1338         userlog("Failed to read lock: %s  - exiting", strerror(err));
1339         exit(EXFAIL);
1340     }
1341 }
1342 
1343 /**
1344  * Make write lock
1345  * @param net network object
1346  */
1347 expublic void exnet_rwlock_write(exnetcon_t *net)
1348 {
1349     if (EXSUCCEED!=pthread_rwlock_rdlock(&net->rwlock))
1350     {
1351         int err = errno;
1352         
1353         NDRX_LOG(log_error, "Failed to write lock: %s - exiting", strerror(err));
1354         userlog("Failed to write lock: %s  - exiting", strerror(err));
1355         exit(EXFAIL);
1356     }
1357 }
1358 
1359 /**
1360  * Unlock the network object
1361  * @param net
1362  */
1363 expublic void exnet_rwlock_unlock(exnetcon_t *net)
1364 {
1365     if (EXSUCCEED!=pthread_rwlock_unlock(&net->rwlock))
1366     {
1367         int err = errno;
1368         
1369         NDRX_LOG(log_error, "Failed to unlock rwlock: %s - exiting", strerror(err));
1370         userlog("Failed to unlock rwlock: %s  - exiting", strerror(err));
1371         exit(EXFAIL);
1372     }
1373 }
1374 
1375 /**
1376  * Main thread about to write to net
1377  * This assumes that main thread already have read lock
1378  * @param net
1379  */
1380 expublic void exnet_rwlock_mainth_write(exnetcon_t *net)
1381 {
1382     exnet_rwlock_unlock(net);
1383     exnet_rwlock_write(net);
1384 }
1385 
1386 /**
1387  * Main thread switching back to read mode
1388  * @param net
1389  */
1390 expublic void exnet_rwlock_mainth_read(exnetcon_t *net)
1391 {
1392     exnet_rwlock_unlock(net);
1393     exnet_rwlock_read(net); /* lock back to read */
1394 }
1395 
1396 /**
1397  * Check are we connected?
1398  */
1399 expublic int exnet_is_connected(exnetcon_t *net)
1400 {
1401     return net->is_connected;
1402 }
1403 
1404 /**
1405  * Gently close the connection
1406  */
1407 expublic int exnet_close_shut(exnetcon_t *net)
1408 {
1409     if (net->lock_init)
1410     {
1411         exnet_rwlock_mainth_write(net);
1412         close_socket(net);
1413         exnet_rwlock_mainth_read(net);
1414     }
1415 
1416     return EXSUCCEED;
1417 }
1418 
1419 /**
1420  * Set custom timeout
1421  * @timeout timeout in seconds
1422  */
1423 expublic int exnet_set_timeout(exnetcon_t *net, int timeout)
1424 {
1425     int ret=EXSUCCEED;
1426     struct timeval tv;
1427 
1428     memset(&tv, 0, sizeof(tv));
1429     tv.tv_sec = timeout;
1430 
1431     if (EXSUCCEED!=setsockopt(net->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)))
1432     {
1433         NDRX_LOG(log_error, "setsockopt() failed for fd=%d: %s",
1434                         net->sock, strerror(errno));
1435         EXFAIL_OUT(ret);
1436     }
1437 
1438 out:
1439     return ret;
1440 }
1441 
1442 /**
1443  * Reset network structure
1444  * @return 
1445  */
1446 expublic void exnet_reset_struct(exnetcon_t *net)
1447 {
1448     memset(net, 0, sizeof(*net));
1449     net->sock = EXFAIL;             /**< file descriptor for the network socket */    
1450     net->rcvtimeout = EXFAIL;       /**< Receive timeout                        */
1451     net->len_pfx = EXFAIL;          /**< Length prefix                          */    
1452     net->is_server = EXFAIL;        /**< server mode                            */
1453     net->rcvtimeout = ndrx_get_G_atmi_env()->time_out;
1454 }
1455 
1456 /**
1457  * Initialize network struct
1458  * Main thread acquires read lock by default
1459  * @param net networks struct to init
1460  * @return EXSUCCEED/EXFAIL
1461  */
1462 expublic int exnet_net_init(exnetcon_t *net)
1463 {
1464     int ret = EXSUCCEED;
1465     int err;
1466     
1467     memset(&(net->rwlock), 0, sizeof(net->rwlock));
1468 
1469     if (EXSUCCEED!=(err=pthread_rwlock_init(&(net->rwlock), NULL)))
1470     {
1471         NDRX_LOG(log_error, "Failed to init rwlock: %s", strerror(err));
1472         userlog("Failed to init rwlock: %s", strerror(err));
1473         EXFAIL_OUT(ret);
1474     }
1475     
1476     MUTEX_VAR_INIT(net->sendlock);
1477     MUTEX_VAR_INIT(net->rcvlock);
1478     MUTEX_VAR_INIT(net->flagslock);
1479     
1480     
1481     ndrx_stopwatch_reset(&net->periodic_stopwatch);
1482     
1483     /* acquire read lock */
1484     if (EXSUCCEED!=(err=pthread_rwlock_rdlock(&(net->rwlock))))
1485     {
1486         userlog("Failed to acquire read lock: %s", strerror(err));
1487         NDRX_LOG(log_error, "Failed to acquire read lock - exiting...: %s",
1488                                                    strerror(err));
1489         exit(EXFAIL);
1490     }
1491     net->lock_init=EXTRUE;
1492             
1493 out:
1494     return ret;
1495 }
1496 /* vim: set ts=4 sw=4 et smartindent: */