Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Clock related routines.
0003  *
0004  * @file clock.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <regex.h>
0039 #include <utlist.h>
0040 
0041 #include <ndebug.h>
0042 #include <atmi.h>
0043 #include <atmi_int.h>
0044 #include <typed_buf.h>
0045 #include <ndrstandard.h>
0046 #include <ubf.h>
0047 #include <Exfields.h>
0048 
0049 #include <exnet.h>
0050 #include <ndrxdcmn.h>
0051 
0052 #include "bridge.h"
0053 #include "../libatmisrv/srv_int.h"
0054 /*---------------------------Externs------------------------------------*/
0055 /*---------------------------Macros-------------------------------------*/
0056 #define CLOCK_DEBUG     1
0057 /*---------------------------Enums--------------------------------------*/
0058 /*---------------------------Typedefs-----------------------------------*/
0059 /*---------------------------Globals------------------------------------*/
0060 /*---------------------------Statics------------------------------------*/
0061 /*---------------------------Prototypes---------------------------------*/
0062 
0063 MUTEX_LOCKDECL(M_timediff_lock);
0064 exprivate ndrx_stopwatch_t M_timediff_sent; /**< for roundtrip calc                     */
0065 exprivate time_t M_timediff_tstamp;      /**< UTC tstamp when msg was sent  for matching*/
0066 exprivate unsigned long M_seq=0;         /**< Last sequence sent                        */
0067 
0068 /**
0069  * Connection infos
0070  * bridge internals
0071  * @param call message from queue
0072  * @return EXSUCCEED/EXFAIL
0073  */
0074 expublic int br_coninfo(command_call_t *call)
0075 {
0076     int ret=EXSUCCEED;
0077     long long diff;
0078     command_reply_brconinfo_t infos; /* currently one ... */
0079     ndrx_stopwatch_t our_time;
0080     
0081     memset(&infos, 0, sizeof(infos)); /* not mission critical, so can set mem */
0082     
0083     
0084     ndrx_stopwatch_reset(&our_time);
0085     /* form up the reply */
0086     infos.rply.magic = NDRX_MAGIC;
0087     infos.rply.command = call->command+1; /* Make reponse */
0088     NDRX_LOG(log_debug, "Reply command: %d", infos.rply.command);
0089     infos.rply.msg_type = NDRXD_CALL_TYPE_BRCONINFO;
0090     infos.rply.msg_src = NDRXD_SRC_BRIDGE; /* from NDRXD */
0091     
0092     /* Response flags, echo back request flags too... */
0093     infos.rply.flags = call->flags;
0094     infos.rply.error_code = 0;
0095     
0096     infos.time = our_time.t.tv_sec;
0097     infos.timems = (long)(our_time.t.tv_nsec / 1000000);
0098     
0099     /* have some consistency */
0100     MUTEX_LOCK_V(M_timediff_lock);
0101     
0102     infos.lastsync = ndrx_stopwatch_get_delta_sec(&G_bridge_cfg.timediff_ourt);
0103     infos.locnodeid = tpgetnodeid();
0104     infos.remnodeid = G_bridge_cfg.nodeid;
0105     infos.srvid = tpgetsrvid();
0106     
0107     if (G_bridge_cfg.is_server)
0108     {
0109         infos.mode = NDRX_CONMODE_PASSIVE;
0110     }
0111     else
0112     {
0113         infos.mode = NDRX_CONMODE_ACTIVE;
0114     }
0115     
0116     if (NULL!=G_bridge_cfg.con)
0117     {
0118         infos.fd = G_bridge_cfg.con->sock;
0119     }
0120     else
0121     {
0122         infos.fd=EXFAIL;
0123     }
0124 
0125     /* read in fast way */
0126     NDRX_SPIN_LOCK_V(G_bridge_cfg.timediff_lock);
0127     diff = G_bridge_cfg.timediff;
0128     NDRX_SPIN_UNLOCK_V(G_bridge_cfg.timediff_lock);
0129         
0130     /* convert to seconds*/
0131     infos.timediffs = (long)(diff/1000);
0132     infos.timediffms = (long)(diff%1000);
0133     infos.roundtrip = G_bridge_cfg.timediff_roundtrip;
0134     
0135     MUTEX_UNLOCK_V(M_timediff_lock);
0136     
0137     ret = ndrx_generic_q_send_2(call->reply_queue, 
0138             (char *)&infos, sizeof(infos), 0, BR_ADMININFO_TOUT, 0);
0139     
0140 out:
0141     return ret;
0142 }
0143 /**
0144  * Initialize clock diff.
0145  * @param call
0146  * @return 
0147  */
0148 expublic int br_calc_clock_diff(command_call_t *call)
0149 {
0150     int ret=EXSUCCEED;
0151     ndrx_stopwatch_t our_time;
0152     cmd_br_time_sync_t *their_time = (cmd_br_time_sync_t *)call;
0153     long long diff=EXFAIL;
0154     long rountrip=0;
0155     int load_time=EXFALSE;
0156     
0157     /* if got request, just send reply */
0158     if (NDRX_BRCLOCK_MODE_REQ==their_time->mode)
0159     {
0160         /* just send data back */
0161         NDRX_LOG(log_debug, "Reply with timestamp");
0162         return br_send_clock(NDRX_BRCLOCK_MODE_RSP, their_time);
0163     }
0164     
0165     /* update the timestamps... stored locally -> the last sync 
0166      * once we move to multi-connections, all clock data needs to be associated
0167      * with connections.s
0168      */
0169     MUTEX_LOCK_V(M_timediff_lock);
0170         
0171     /* So we have their time let timer lib, to get diff */
0172     ndrx_stopwatch_reset(&our_time);
0173     
0174     if (NDRX_BRCLOCK_MODE_ASYNC==their_time->mode)
0175     {
0176         load_time=EXTRUE;
0177     }
0178     else if (NDRX_BRCLOCK_MODE_RSP==their_time->mode)
0179     {
0180 
0181         /* check the vars... */
0182        
0183         rountrip = (long)ndrx_stopwatch_diff(&our_time, &M_timediff_sent);
0184         
0185         if (their_time->orig_seq==M_seq
0186                 && rountrip <= G_bridge_cfg.max_roundtrip
0187                 && their_time->orig_nodeid == tpgetnodeid()
0188                 && their_time->orig_timestamp == M_timediff_tstamp
0189                 )
0190         {
0191             load_time=EXTRUE;
0192         }
0193         else
0194         {
0195             NDRX_LOG(log_error, "DROP time sync echo data: "
0196                     "seq_rcv: %lu vs seq_our: %lu, "
0197                     "rountrip: %ld vs max allow: %ld, "
0198                     "nodeid_rcv: %d vs nodeid_our: %d, "
0199                     "tstamp_rcv: %lld vs tstamp_our: %lld",
0200                     their_time->orig_seq, M_seq,
0201                     rountrip, G_bridge_cfg.max_roundtrip,
0202                     their_time->orig_nodeid, tpgetnodeid(),
0203                     their_time->orig_timestamp, M_timediff_tstamp
0204                     );
0205         }    
0206     }
0207     
0208     if (load_time)
0209     {
0210         diff=ndrx_stopwatch_diff(&our_time, &their_time->time);
0211     
0212         NDRX_LOG(log_warn, "Monotonic clock time correction between us "
0213                 "and node %d is: %lld msec (%d), roundtrip: %ld ms, seq: %ld, data mode: %d", 
0214                 call->caller_nodeid, diff, sizeof(diff), rountrip, M_seq, their_time->mode);
0215         
0216         
0217         /* so if admin tool reads the stuff needs to have spin + to get all readings... */
0218         NDRX_SPIN_LOCK_V(G_bridge_cfg.timediff_lock);
0219         G_bridge_cfg.timediff=diff;
0220         NDRX_SPIN_UNLOCK_V(G_bridge_cfg.timediff_lock);
0221         
0222         /* normally there shall be now time updates in the row
0223          * and the bellow infos are use only for admin tool
0224          * thus do not keep the spinlock for al long...
0225          */
0226         G_bridge_cfg.timediff_ourt = our_time;
0227         G_bridge_cfg.timediff_roundtrip = rountrip;
0228         
0229     }
0230     MUTEX_UNLOCK_V(M_timediff_lock);
0231     
0232 out:
0233     return ret;
0234 }
0235 
0236 
0237 /**
0238  * Send or clock to server.
0239  * @param mode see NDRX_BRCLOCK_MODE_* consts
0240  * @param in case of NDRX_BRCLOCK_MODE_RSP this holds the original request
0241  * @return EXSUCCEED/EXFAIL 
0242  */
0243 expublic int br_send_clock(int mode, cmd_br_time_sync_t *rcv)
0244 {
0245     char *fn = "br_send_clock";
0246     cmd_br_time_sync_t ourtime;
0247     int ret=EXSUCCEED;
0248     
0249     NDRX_LOG(log_debug, "%s - enter", fn);
0250     
0251     memset(&ourtime, 0, sizeof(ourtime));
0252     
0253     cmd_generic_init(NDRXD_COM_BRCLOCK_RQ, NDRXD_SRC_BRIDGE, NDRXD_CALL_TYPE_BRBCLOCK,
0254                             (command_call_t *)&ourtime, ndrx_get_G_atmi_conf()->reply_q_str);
0255     ndrx_stopwatch_reset(&ourtime.time);
0256     
0257     if (NDRX_BRCLOCK_MODE_RSP==mode)
0258     {
0259         /* reply with original data  */
0260         ourtime.orig_nodeid = rcv->orig_nodeid;
0261         ourtime.orig_timestamp = rcv->orig_timestamp;
0262         ourtime.orig_seq = rcv->orig_seq;
0263     }
0264     else
0265     {
0266         /* make new request async at startup or request */
0267         ourtime.orig_nodeid = tpgetnodeid();
0268         
0269         MUTEX_LOCK_V(M_timediff_lock);
0270         /* have tstamp for correlation.. */
0271         ourtime.orig_timestamp = time(NULL);
0272         M_timediff_tstamp = ourtime.orig_timestamp;
0273         ndrx_stopwatch_reset(&M_timediff_sent);
0274         /* let if overflow, no problem... */
0275         M_seq++;
0276         ourtime.orig_seq = M_seq;
0277         MUTEX_UNLOCK_V(M_timediff_lock);
0278     }
0279     
0280     ourtime.mode=mode;
0281     
0282     ret=br_send_to_net((char*)&ourtime, sizeof(ourtime), BR_NET_CALL_MSG_TYPE_NDRXD, 
0283             ourtime.call.command);
0284     
0285 out:
0286 
0287     NDRX_LOG(log_debug, "%s return %d", fn, ret);
0288     return ret;
0289     
0290 }
0291 
0292 /**
0293  * Adjust clock in packet.
0294  * @return 
0295  */
0296 expublic void br_clock_adj(tp_command_call_t *call, int is_out)
0297 {
0298     long long diff;
0299     
0300     NDRX_SPIN_LOCK_V(G_bridge_cfg.timediff_lock);
0301     diff = G_bridge_cfg.timediff;
0302     NDRX_SPIN_UNLOCK_V(G_bridge_cfg.timediff_lock);
0303     
0304     N_TIMER_DUMP(log_info, "Call timer: ", call->timer);    
0305 #if CLOCK_DEBUG
0306     ndrx_stopwatch_t our_time;
0307     ndrx_stopwatch_reset(&our_time);
0308     NDRX_LOG(log_debug, "Original call age: %lld ms", 
0309             ndrx_stopwatch_diff(&call->timer, &our_time));
0310 #endif
0311     if (is_out)
0312     {
0313         ndrx_stopwatch_minus(&call->timer, diff);
0314     }
0315     else
0316     {
0317         ndrx_stopwatch_plus(&call->timer, diff);
0318     }
0319         
0320     NDRX_LOG(log_debug, "Clock diff: %lld ms", diff);
0321     N_TIMER_DUMP(log_info, "Adjusted call timer: ", call->timer);
0322     
0323 #if CLOCK_DEBUG
0324     NDRX_LOG(log_debug, "New call age: %lld ms", 
0325             ndrx_stopwatch_diff(&call->timer, &our_time));
0326     NDRX_LOG(log_debug, "Clock based call age (according to tstamp): %d", 
0327             time(NULL) - call->timestamp);
0328 #endif
0329         
0330 }
0331 /* vim: set ts=4 sw=4 et smartindent: */