Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Benchmark tool client
0003  *
0004  * @file exbenchcl.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <signal.h>
0039 
0040 #include <ndebug.h>
0041 #include <atmi.h>
0042 #include <exthpool.h>
0043 
0044 #include "atmi_int.h"
0045 #include "expr.h"
0046 #include <typed_buf.h>
0047 #include <atmi_int.h>   /*  ndrx_tpchkunsol() */
0048 #include <atmi_tls.h>   /* my_id */
0049 #include <Exfields.h>
0050 /*---------------------------Externs------------------------------------*/
0051 /*---------------------------Macros-------------------------------------*/
0052 
0053 /**
0054  * Read end of pipe
0055  */
0056 #define NDRX_READ 0
0057 
0058 
0059 /**
0060  * Write end of pipe
0061  */
0062 #define NDRX_WRITE 1
0063 
0064 /*---------------------------Enums--------------------------------------*/
0065 /*---------------------------Typedefs-----------------------------------*/
0066 /*---------------------------Globals------------------------------------*/
0067 /*---------------------------Statics------------------------------------*/
0068 exprivate int M_nr_threads=1;
0069 
0070 /* if contains %d then it is replaced with thread number */
0071 exprivate char M_svcnm[XATMI_EVENT_MAX+1]="EXBENCH";
0072 exprivate int M_runtime=60;             /**< run for 60 sec default          */
0073 exprivate char *M_sample_data=NULL;     /**< data to send                    */
0074 exprivate BFLDID M_fld=BBADFLDID;       /**< field used for random data fill */
0075 exprivate int M_rndsize=1024;           /**< test data size                  */
0076 exprivate int M_doplot=EXFALSE;         /**< Do plot the benchmark restulst  */
0077 exprivate int M_prio=NDRX_MSGPRIO_DEFAULT;  /**< Call priorioty              */
0078 exprivate typed_buffer_descr_t * M_buftype = NULL;
0079 exprivate threadpool M_threads; /**< thread pool */
0080 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0081 exprivate int M_do_run = EXTRUE;
0082 exprivate long M_msg_sent=0;    /**< Messages sent */
0083 exprivate char *M_master_buf=NULL;  /**< This is master sample buffer       */
0084 exprivate int M_msgsize = 0;        /**< effective message size */
0085 exprivate int M_fork = EXFALSE;     /**< Use forking */
0086 exprivate int M_fd[2]={EXFAIL, EXFAIL}; /**< pipe channel for forking clients to report stats */
0087 exprivate int M_svcnum = 0;        /**< No multi-services used */
0088 /** persistent queue mode, queue space */
0089 exprivate char M_qspace[XATMI_SERVICE_NAME_LENGTH+1] = {EXEOS};
0090 exprivate int M_autoq = EXFALSE;   /**< Use autoq testing                   */
0091 exprivate int M_enqonly = EXFALSE;   /**< Persisten q, enqueue only         */
0092 exprivate long M_numreq = EXFALSE;   /**< Number of requests                */
0093 
0094 exprivate int M_tran = EXFALSE; /**< use distr tran */
0095 exprivate int M_notify_mode = EXFALSE;  /**< shall notification be expected */
0096 exprivate __thread long M_sent=0;               /** Current position of thread */
0097 exprivate __thread long M_notif_sent_acq = EXFAIL;  /**< ACQ position          */
0098 exprivate int M_event_mode=EXFALSE; /**< is event mode enabled?                */
0099 /*---------------------------Prototypes---------------------------------*/
0100 
0101 /**
0102  * Process the notification callback
0103  */
0104 exprivate void notification_callback (char *data, long len, long flags)
0105 {
0106     M_notif_sent_acq = M_sent;
0107 }
0108 
0109 /* need to synchronize function for starting the sending... */
0110 expublic void thread_process(void *ptr, int *p_finish_off)
0111 {
0112     /*  thread number */
0113     long thnum = (long)ptr;
0114     char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0115     char *buf = tpalloc(M_buftype->type, NULL, M_rndsize*2);
0116     char *rcv_buf;
0117     long rcvlen;
0118     
0119     TPQCTL qc;
0120     ndrx_stopwatch_t w;
0121     
0122     if (EXSUCCEED!=tpinit(NULL))
0123     {
0124         NDRX_LOG(log_error, "Failed to init: %s", tpstrerror(tperrno));
0125         userlog("Failed to init: %s", tpstrerror(tperrno));
0126         exit(-1);
0127     }
0128     
0129     
0130     if (M_tran && EXSUCCEED!=tpopen())
0131     {
0132         NDRX_LOG(log_error, "Failed to tpopen(): %s", 
0133                 tpstrerror(tperrno));
0134         exit(-1);
0135     }
0136 
0137     if (NULL==buf)
0138     {
0139         NDRX_LOG(log_error, "Failed to alloc send buf: %s", 
0140                 tpstrerror(tperrno));
0141         exit(-1);
0142     }
0143     
0144     /* In case if notify Q -> append the buffer with myid */
0145     memcpy(buf, M_master_buf, M_rndsize*2);
0146     
0147     if (M_notify_mode)
0148     {
0149         /* load client id so that server may provide notification */
0150         if (EXSUCCEED!=Bchg((UBFH *)buf, EX_CLTID, 0, G_atmi_tls->G_atmi_conf.my_id, 0))
0151         {
0152             NDRX_LOG(log_error, "Failed to set EX_CLTID: %s", Bstrerror(Berror));
0153             exit(-1);
0154         }
0155         
0156         /* set callback... */
0157         tpsetunsol (notification_callback);
0158         
0159     }
0160     
0161     /* Service by thread */
0162     
0163     if (M_svcnum > 0)
0164     {
0165         snprintf(svcnm, sizeof(svcnm), "%s%03d", M_svcnm, (int)thnum % M_svcnum);
0166     }
0167     else
0168     {
0169         /* just use name directly.. */
0170         NDRX_STRCPY_SAFE(svcnm, M_svcnm);
0171     }
0172     
0173     /* re-lock.. */
0174     MUTEX_LOCK_V(M_wait_mutex);
0175     MUTEX_UNLOCK_V(M_wait_mutex);
0176 
0177     ndrx_stopwatch_reset(&w);
0178     
0179     while ((!M_fork && M_do_run) || (M_fork && ndrx_stopwatch_get_delta_sec(&w) < M_runtime))
0180     {
0181         if (M_prio!=NDRX_MSGPRIO_DEFAULT)
0182         {
0183             tpsprio(M_prio, TPABSOLUTE);
0184         }
0185                 
0186         /* start tran, if M_tran */
0187         if (M_tran && EXSUCCEED!=tpbegin(M_tran, 0))
0188         {
0189             NDRX_LOG(log_error, "tpbegin() failed: %s",
0190                 tpstrerror(tperrno));
0191             exit(-1);
0192         }
0193 
0194         if (M_qspace[0])
0195         {
0196             /* Run enq to SVCNM */
0197             memset(&qc, 0, sizeof(qc));
0198             if (EXSUCCEED!=tpenqueue(M_qspace, svcnm, &qc, buf, 0, 0))
0199             {
0200                 NDRX_LOG(log_error, "tpenqueue() failed %s diag: %d:%s", 
0201                         tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0202                 exit(-1);
0203             }
0204 
0205             if (!M_autoq && !M_enqonly)
0206             {
0207 
0208                 /* restart the tran if doing deq */
0209                 if (M_tran && EXSUCCEED!=tpcommit(0))
0210                 {
0211                     NDRX_LOG(log_error, "tpcommit() failed: %s",
0212                         tpstrerror(tperrno));
0213                     exit(-1);
0214                 }
0215 
0216                 if (M_tran && EXSUCCEED!=tpbegin(M_tran, 0))
0217                 {
0218                     NDRX_LOG(log_error, "tpbegin() failed: %s",
0219                         tpstrerror(tperrno));
0220                     exit(-1);
0221                 }
0222 
0223                 /* Run deq from SVCNM */
0224                 memset(&qc, 0, sizeof(qc));
0225                 rcv_buf=NULL;
0226                 if (EXSUCCEED!=tpdequeue(M_qspace, svcnm, &qc, (char **)&rcv_buf, &rcvlen, 0))
0227                 {
0228                     NDRX_LOG(log_error, "tpdequeue() failed %s diag: %d:%s", 
0229                             tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0230                     exit(-1);
0231                 }
0232             }
0233 
0234         }
0235         else
0236         {
0237             rcv_buf=NULL;
0238             if (M_event_mode)
0239             {
0240                 if (tppost(svcnm, buf, 0, 0)<=0)
0241                 {
0242                     NDRX_LOG(log_error, "Failed to post event [%s]: %s", 
0243                             svcnm, tpstrerror(tperrno));
0244                     exit(-1);
0245                 }
0246             }
0247             else if (EXFAIL==tpcall(svcnm, buf, 0, &rcv_buf, &rcvlen, 0))
0248             {
0249                 NDRX_LOG(log_error, "Failed to call [%s]: %s", 
0250                         svcnm, tpstrerror(tperrno));
0251                 exit(-1);
0252             }
0253         }
0254 
0255         if (M_tran && EXSUCCEED!=tpcommit(0))
0256         {
0257             NDRX_LOG(log_error, "tpcommit() 2 failed: %s",
0258                 tpstrerror(tperrno));
0259             exit(-1);
0260         }
0261         
0262         if (NULL!=rcv_buf)
0263         {
0264             tpfree(rcv_buf);
0265         }
0266         
0267         /* if set, wait for notification back! 
0268          * either we got the callback already during the call...
0269          * Or we wait for callback here
0270          * We need some sequencing.
0271          * Will use sent as seq...
0272          */
0273         if (M_notify_mode)
0274         {
0275             while (M_notif_sent_acq<M_sent)
0276             {
0277                 if (EXFAIL==ndrx_tpchkunsol(0))
0278                 {
0279                     if (tperrno!=TPETIME)
0280                     {
0281                         NDRX_LOG(log_error, "Failed to wait for notif: %s", 
0282                                 tpstrerror(tperrno));
0283                         exit(-1);
0284                     }
0285                 }
0286             }
0287         }
0288         
0289         M_sent++;
0290         /* enqueue number of messages only... */
0291         if (M_numreq && M_sent >= M_numreq)
0292         {
0293             break;
0294         }
0295     }
0296     
0297     /* publish results... */
0298     MUTEX_LOCK_V(M_wait_mutex);
0299     M_msg_sent+=M_sent;
0300     MUTEX_UNLOCK_V(M_wait_mutex);
0301 
0302     /* Wait on queue to finish ... 
0303      * Additionally, if we have several servers running
0304      * then @TMQ servers running, then we shall peek
0305      * them all for the stats, if all queues are empty,
0306      * only then terminate
0307      */
0308     if (M_qspace[0] && M_autoq)
0309     {
0310         int done = EXFALSE;
0311         do
0312         {
0313             memset(&qc, 0, sizeof(qc));
0314             qc.flags|=TPQPEEK;
0315             rcv_buf=NULL;
0316             if (EXSUCCEED!=tpdequeue(M_qspace, svcnm, &qc, (char **)&rcv_buf, &rcvlen, 0))
0317             {
0318                 if (tperrno==TPEDIAGNOSTIC && QMENOMSG==qc.diagnostic)
0319                 {
0320                     done=EXTRUE;
0321                 }
0322                 else
0323                 {
0324                     NDRX_LOG(log_error, "tpdequeue() failed %s diag: %d:%s", 
0325                             tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0326                     exit(-1);
0327                 }
0328             }
0329             else
0330             {
0331                 if (NULL!=rcv_buf)
0332                 {
0333                     tpfree(rcv_buf);
0334                 }
0335                 sleep(1);
0336             }
0337             
0338         } while (!done);
0339     }
0340     
0341 out:
0342     
0343     *p_finish_off=EXTRUE;
0344 
0345     /* free up ... */
0346     if (NULL!=buf)
0347     {
0348         tpfree(buf);
0349     }
0350     
0351     if (M_tran)
0352     {
0353         tpclose();
0354     }    
0355 
0356     /* release resources */
0357     tpterm();
0358 
0359     return;
0360     
0361 }
0362 
0363 /**
0364  * Print usage
0365  * @param bin binary name
0366  */
0367 expublic void usage(char *bin)
0368 {
0369     fprintf(stderr, "Usage: %s [options] -B buffer_type \n", bin);
0370     fprintf(stderr, "Options:\n");
0371     fprintf(stderr, "  -p <prio>        Call priority\n");
0372     fprintf(stderr, "  -P               Plot results, default false. Needs NDRX_BENCH_FILE and NDRX_BENCH_CONFIGNAME\n");
0373     fprintf(stderr, "  -n <threadsnr>   Number of threads, default 1\n");
0374     fprintf(stderr, "  -s <svcnm>       Service to call\n");
0375     fprintf(stderr, "  -t <time>        Number of seconds to run\n");
0376     fprintf(stderr, "  -f <fldnm>       Ubf field name to fill with random data\n");
0377     fprintf(stderr, "  -b <data>        Initial UBF data. In tpjsontoubf() format.\n");
0378     fprintf(stderr, "  -S <size>        Random data size, default 1024\n");
0379     fprintf(stderr, "  -F               Use forking instead of threading\n");
0380     fprintf(stderr, "  -N <svcnum>      Number of services\n");
0381     fprintf(stderr, "  -Q <qspname>     Persistent queue mode. Queue space name (thread enq+deq)\n");
0382     fprintf(stderr, "  -A               Auto queue testing (forwarding)\n");
0383     fprintf(stderr, "  -E               Persist only\n");
0384     fprintf(stderr, "  -R <msgnum>      Number of requests (time or nr first to stop)\n");
0385     fprintf(stderr, "  -T <tout_sec>    Initiate global transaction for XATMI calls\n");
0386     fprintf(stderr, "  -I               Wait for client notification\n");
0387     fprintf(stderr, "  -e               Post the event, -s serves as event\n");
0388 }
0389 
0390 /**
0391  * Benchmark tool entry
0392  */
0393 expublic int main( int argc, char** argv )
0394 {
0395     int ret = EXSUCCEED;
0396     int c;
0397     long i;
0398     char *rnd_block = NULL;
0399     double spent;
0400     double tps;
0401     ndrx_stopwatch_t w;
0402     char run_ver[512];
0403     int parent=EXTRUE;
0404     /* parse args: 
0405      * -n <number_of_threads> 
0406      * -s <service_to_call> 
0407      * -t <time_in_sec> 
0408      * -b <UBF buffer base> 
0409      * -f <random_data_field for UBF> 
0410      * -S <random_data_size>
0411      * -P - do plot
0412      * -p <call_priority>
0413      * -F - use forking mode
0414      * -N <number_of_services_modulus>
0415      */
0416     
0417     /* Currently Only UBF is supported
0418      * TODO: in future releases use tpimport() to teterminte the buffer format
0419      */
0420     M_buftype = ndrx_get_buffer_descr("UBF", NULL);
0421 
0422     while ((c = getopt (argc, argv, "n:s:t:b:S:p:Pf:FN:Q:AER:T:Ie")) != -1)
0423     {
0424         switch (c)
0425         {
0426             case 'e':
0427                 M_event_mode=EXTRUE;
0428                 break;
0429             case 'A':
0430                 M_autoq = EXTRUE;
0431                 break;
0432             case 'R':
0433                 M_numreq = atol(optarg);
0434                 break;
0435             case 'E':
0436                 M_enqonly = EXTRUE;
0437                 break;
0438             case 'Q':
0439                 NDRX_STRCPY_SAFE(M_qspace, optarg);
0440                 break;
0441             case 'N':
0442                 M_svcnum = atoi(optarg);
0443                 break;
0444             case 'F':
0445                 M_fork = EXTRUE;
0446                 break;
0447             case 'p':
0448                 M_prio = atoi(optarg);
0449                 break;
0450             case 'P':
0451                 M_doplot = EXTRUE;
0452                 break;
0453             case 'n':
0454                 /* this will allocate thread pool... */
0455                 M_nr_threads=atoi(optarg);
0456                 break;
0457             case 's':
0458                 /* service to call */
0459                 NDRX_STRCPY_SAFE(M_svcnm, optarg);
0460                 break;
0461             case 't':
0462                 M_runtime = atoi(optarg);
0463                 break;
0464             case 'b':
0465                 
0466                 M_sample_data = NDRX_STRDUP(optarg);
0467                 
0468                 if (NULL==M_sample_data)
0469                 {
0470                     NDRX_LOG(log_error, "Failed to copy data buffer: %s", strerror(errno));
0471                     EXFAIL_OUT(ret);
0472                 }
0473                 
0474                 break;
0475             case 'f':
0476                 
0477                 M_fld = Bfldid(optarg);
0478                 
0479                 if (BBADFLDID==M_fld)
0480                 {
0481                     NDRX_LOG(log_error, "Failed to resolve field id [%s]: %s", 
0482                             optarg, Bstrerror(Berror));
0483                     EXFAIL_OUT(ret);
0484                 }
0485                 break;
0486             case 'S':
0487                 M_rndsize = atoi(optarg);
0488                 break;
0489             case 'T':
0490                 M_tran = atoi(optarg);
0491                 break;
0492             case 'I':
0493                 M_notify_mode = EXTRUE;
0494                 break;
0495             default:
0496                 NDRX_LOG(log_error, "Unknown option %c", c);
0497                 usage(argv[0]);
0498                 EXFAIL_OUT(ret);
0499                 break;
0500         }
0501     }
0502     
0503     
0504     /*** print details: ****/
0505     
0506     NDRX_LOG(log_info, "M_svcnm=[%s]", M_svcnm);
0507     NDRX_LOG(log_info, "M_runtime=%d", M_runtime);
0508     NDRX_LOG(log_info, "M_sample_data=[%s]", (M_sample_data?M_sample_data:"NULL"));
0509     NDRX_LOG(log_info, "M_fld=%ld", M_fld);
0510     NDRX_LOG(log_info, "M_rndsize=%d", M_rndsize);
0511     NDRX_LOG(log_info, "M_doplot=%d", M_doplot);
0512     NDRX_LOG(log_info, "M_prio=%d", M_prio);
0513     NDRX_LOG(log_info, "M_buftype=%p", M_buftype);
0514     NDRX_LOG(log_info, "M_nr_threads=%d", M_nr_threads);
0515     NDRX_LOG(log_info, "M_fork=%d", M_fork);
0516     NDRX_LOG(log_info, "M_svcnum=%d", M_svcnum);
0517     NDRX_LOG(log_info, "M_qspace=[%s]", M_qspace);
0518     NDRX_LOG(log_info, "M_autoq=[%d]", M_autoq);
0519     NDRX_LOG(log_info, "M_enqonly=[%d]", M_autoq);
0520     NDRX_LOG(log_info, "M_numreq=[%ld]", M_numreq);
0521     NDRX_LOG(log_info, "M_tran=[%d]", M_tran);
0522     NDRX_LOG(log_info, "M_notify_mode=[%d]", M_notify_mode);
0523     NDRX_LOG(log_info, "M_event_mode=[%d]", M_event_mode);
0524     
0525     /* allocate the buffer & fill with random data */
0526     
0527     if (NULL==M_buftype)
0528     {
0529         NDRX_LOG(log_error, "Invalid buffer specified (or not specified)");
0530         usage(argv[0]);
0531         EXFAIL_OUT(ret);
0532     }
0533     
0534     M_master_buf = tpalloc(M_buftype->type, NULL, 1024 + M_rndsize*2);
0535     
0536     if (NULL==M_master_buf)
0537     {
0538         NDRX_LOG(log_error, "Failed to allocate send buffer: %s", tpstrerror(tperrno));
0539         EXFAIL_OUT(ret);
0540     }
0541     
0542     /* parse data in ... */
0543     
0544     if (0==strcmp(M_buftype->type, "UBF"))
0545     {
0546         if (NULL!=M_sample_data)
0547         {
0548             if (EXSUCCEED!=tpjsontoubf((UBFH *)M_master_buf, M_sample_data))
0549             {
0550                 NDRX_LOG(log_error, "Failed to parse call data: %s", 
0551                         tpstrerror(tperrno));
0552                 EXFAIL_OUT(ret);
0553             }
0554         }
0555         
0556         /* load random data */
0557         rnd_block = NDRX_MALLOC(M_rndsize);
0558         
0559         if (NULL==rnd_block)
0560         {
0561             NDRX_LOG(log_error, "Failed to malloc random block: %s", 
0562                     strerror(errno));
0563             EXFAIL_OUT(ret);
0564         }
0565        
0566         /* carray block */
0567         if (BBADFLDID!=M_fld)
0568         {
0569             NDRX_LOG(log_debug, "Adding random block to %s of %d", Bfname(M_fld), M_rndsize);
0570             if (EXSUCCEED!=Bchg((UBFH *)M_master_buf, M_fld, 0, rnd_block, M_rndsize))
0571             {
0572                 NDRX_LOG(log_error, "Failed to add random block: %s", 
0573                         Bstrerror(Berror));
0574                 EXFAIL_OUT(ret);
0575             }
0576         }
0577         
0578         M_msgsize=Bused((UBFH *)M_master_buf);
0579     }
0580     else if (M_notify_mode)
0581     {
0582         NDRX_LOG(log_error, "Notify (-I) mode only supported on UBF buffers!");
0583         EXFAIL_OUT(ret);
0584     }
0585     
0586     if (!getenv("NDRX_BENCH_FILE"))
0587     {
0588         setenv("NDRX_BENCH_FILE", "test.out", EXTRUE);
0589     }
0590     
0591     if (!getenv("NDRX_BENCH_CONFIGNAME"))
0592     {
0593         snprintf(run_ver, sizeof(run_ver), "test");
0594         setenv("NDRX_BENCH_CONFIGNAME", run_ver, EXTRUE);
0595     }
0596     
0597     /* just in anty case... */
0598     tpterm();
0599 
0600     if (M_fork)
0601     {
0602         signal(SIGCHLD, SIG_IGN); /* ignore childs, we will wait on pipe */
0603         
0604         NDRX_LOG(log_debug, "Fork mode");
0605 
0606         if (EXSUCCEED!=pipe ( M_fd ))
0607         {
0608             NDRX_LOG(log_error, "Failed to pipe: %s", strerror(errno));
0609             EXFAIL_OUT(ret);
0610         }
0611         
0612         for (i=0; i<M_nr_threads; i++)
0613         {
0614             if ( ndrx_fork () == 0 ) /* Child Writer */
0615             {
0616                 int finish=EXFALSE;
0617                 parent=EXFALSE;
0618                 
0619                 sleep(1);
0620                 
0621                 ndrx_stopwatch_reset(&w);
0622                 
0623                 /* mark the run-time... */
0624                 thread_process((void *)i, &finish);
0625                 
0626                 spent=ndrx_stopwatch_get_delta(&w);
0627 
0628                 tps = ((double)M_msg_sent / (spent / 1000));
0629                 NDRX_LOG(log_debug, "Spent: %ld ms msgs: %ld tps: %lf", 
0630                     spent, M_msg_sent, tps);
0631                 
0632                 if (sizeof(tps)!=write (M_fd[NDRX_WRITE], (char *)&tps, sizeof(tps)))
0633                 {
0634                     NDRX_LOG(log_error, "Failed to write to pipe: %s", 
0635                             strerror(errno));
0636                     userlog("Failed to write to pipe: %s", 
0637                             strerror(errno));
0638                     EXFAIL_OUT(ret);
0639                 }
0640                 
0641                 /* proc is terminating.. */
0642                 goto out;
0643                 
0644             }
0645         }
0646        
0647         /* read the results */
0648         for (i=0; i<M_nr_threads; i++)
0649         {
0650             double tpsproc=0;
0651             if (sizeof(tpsproc)!=read ( M_fd[NDRX_READ], (char *)&tpsproc, sizeof(tpsproc)))
0652             {
0653                 NDRX_LOG(log_error, "Failed to read result %d: %s", strerror(errno));
0654                 EXFAIL_OUT(ret);
0655             }
0656             /* update totals */
0657             tps+=tpsproc;
0658         }
0659     }
0660     else
0661     {
0662         
0663         NDRX_LOG(log_debug, "Thread mode");
0664         M_threads = ndrx_thpool_init(M_nr_threads,  &ret, NULL, NULL, 0, NULL);
0665 
0666         if (EXSUCCEED!=ret)
0667         {
0668             NDRX_LOG(log_error, "Thread pool init failure");
0669             EXFAIL_OUT(ret);
0670         }
0671 
0672         /* sync to master */
0673         MUTEX_LOCK_V(M_wait_mutex);
0674 
0675         for (i=0; i<M_nr_threads; i++)
0676         {
0677             /* thread nr is set as ptr */
0678             ndrx_thpool_add_work(M_threads, (void*)thread_process, (void *)i);
0679         }
0680 
0681         /* let threads to prepare */
0682         sleep(2);
0683 
0684         MUTEX_UNLOCK_V(M_wait_mutex);
0685         ndrx_stopwatch_reset(&w);
0686 
0687         /* let it run... */
0688         while (ndrx_stopwatch_get_delta_sec(&w) < M_runtime)
0689         {
0690             /* If all threads have made exit, assume it was
0691              * request based run
0692              */
0693             sleep(1);
0694             
0695             if (M_nr_threads==ndrx_thpool_nr_not_working(M_threads))
0696             {
0697                 NDRX_LOG(log_debug, "All threads did exit.");
0698                 break;
0699             }
0700         }
0701         M_do_run=EXFALSE;
0702 
0703         /* wait.. */
0704         ndrx_thpool_wait(M_threads);
0705         spent=ndrx_stopwatch_get_delta(&w);
0706 
0707         tps = ((double)M_msg_sent / (spent / 1000));
0708 
0709         NDRX_LOG(log_debug, "Spent: %ld ms msgs: %ld tps: %lf", 
0710                 spent, M_msg_sent, tps);
0711 
0712         ndrx_thpool_destroy(M_threads);
0713     }
0714     
0715     /* write the stats... */
0716     if (M_doplot)
0717     {
0718         ndrx_bench_write_stats(M_msgsize, tps);
0719     }
0720     
0721 out:
0722     if (EXFAIL!=M_fd[NDRX_READ])
0723     {
0724         close(M_fd[NDRX_READ]);
0725     }
0726 
0727     if (EXFAIL!=M_fd[NDRX_WRITE])
0728     {
0729         close(M_fd[NDRX_WRITE]);
0730     }
0731 
0732     if (NULL!=M_master_buf)
0733     {
0734         tpfree((char *)M_master_buf);
0735     }
0736 
0737     if (NULL!=M_sample_data)
0738     {
0739         NDRX_FREE(M_sample_data);
0740     }
0741 
0742     if (NULL!=rnd_block)
0743     {
0744         NDRX_FREE(rnd_block);
0745     }
0746 
0747     return ret;
0748     
0749 }
0750 
0751 /* vim: set ts=4 sw=4 et smartindent: */