0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 #include <stdio.h>
0035 #include <stdlib.h>
0036 #include <string.h>
0037 #include <errno.h>
0038 #include <signal.h>
0039
0040 #include <ndebug.h>
0041 #include <atmi.h>
0042 #include <exthpool.h>
0043
0044 #include "atmi_int.h"
0045 #include "expr.h"
0046 #include <typed_buf.h>
0047 #include <atmi_int.h> /* ndrx_tpchkunsol() */
0048 #include <atmi_tls.h> /* my_id */
0049 #include <Exfields.h>
0050
0051
0052
0053
0054
0055
0056 #define NDRX_READ 0
0057
0058
0059
0060
0061
0062 #define NDRX_WRITE 1
0063
0064
0065
0066
0067
0068 exprivate int M_nr_threads=1;
0069
0070
0071 exprivate char M_svcnm[XATMI_EVENT_MAX+1]="EXBENCH";
0072 exprivate int M_runtime=60;
0073 exprivate char *M_sample_data=NULL;
0074 exprivate BFLDID M_fld=BBADFLDID;
0075 exprivate int M_rndsize=1024;
0076 exprivate int M_doplot=EXFALSE;
0077 exprivate int M_prio=NDRX_MSGPRIO_DEFAULT;
0078 exprivate typed_buffer_descr_t * M_buftype = NULL;
0079 exprivate threadpool M_threads;
0080 exprivate MUTEX_LOCKDECL(M_wait_mutex);
0081 exprivate int M_do_run = EXTRUE;
0082 exprivate long M_msg_sent=0;
0083 exprivate char *M_master_buf=NULL;
0084 exprivate int M_msgsize = 0;
0085 exprivate int M_fork = EXFALSE;
0086 exprivate int M_fd[2]={EXFAIL, EXFAIL};
0087 exprivate int M_svcnum = 0;
0088
0089 exprivate char M_qspace[XATMI_SERVICE_NAME_LENGTH+1] = {EXEOS};
0090 exprivate int M_autoq = EXFALSE;
0091 exprivate int M_enqonly = EXFALSE;
0092 exprivate long M_numreq = EXFALSE;
0093
0094 exprivate int M_tran = EXFALSE;
0095 exprivate int M_notify_mode = EXFALSE;
0096 exprivate __thread long M_sent=0;
0097 exprivate __thread long M_notif_sent_acq = EXFAIL;
0098 exprivate int M_event_mode=EXFALSE;
0099
0100
0101
0102
0103
0104 exprivate void notification_callback (char *data, long len, long flags)
0105 {
0106 M_notif_sent_acq = M_sent;
0107 }
0108
0109
0110 expublic void thread_process(void *ptr, int *p_finish_off)
0111 {
0112
0113 long thnum = (long)ptr;
0114 char svcnm[XATMI_SERVICE_NAME_LENGTH+1];
0115 char *buf = tpalloc(M_buftype->type, NULL, M_rndsize*2);
0116 char *rcv_buf;
0117 long rcvlen;
0118
0119 TPQCTL qc;
0120 ndrx_stopwatch_t w;
0121
0122 if (EXSUCCEED!=tpinit(NULL))
0123 {
0124 NDRX_LOG(log_error, "Failed to init: %s", tpstrerror(tperrno));
0125 userlog("Failed to init: %s", tpstrerror(tperrno));
0126 exit(-1);
0127 }
0128
0129
0130 if (M_tran && EXSUCCEED!=tpopen())
0131 {
0132 NDRX_LOG(log_error, "Failed to tpopen(): %s",
0133 tpstrerror(tperrno));
0134 exit(-1);
0135 }
0136
0137 if (NULL==buf)
0138 {
0139 NDRX_LOG(log_error, "Failed to alloc send buf: %s",
0140 tpstrerror(tperrno));
0141 exit(-1);
0142 }
0143
0144
0145 memcpy(buf, M_master_buf, M_rndsize*2);
0146
0147 if (M_notify_mode)
0148 {
0149
0150 if (EXSUCCEED!=Bchg((UBFH *)buf, EX_CLTID, 0, G_atmi_tls->G_atmi_conf.my_id, 0))
0151 {
0152 NDRX_LOG(log_error, "Failed to set EX_CLTID: %s", Bstrerror(Berror));
0153 exit(-1);
0154 }
0155
0156
0157 tpsetunsol (notification_callback);
0158
0159 }
0160
0161
0162
0163 if (M_svcnum > 0)
0164 {
0165 snprintf(svcnm, sizeof(svcnm), "%s%03d", M_svcnm, (int)thnum % M_svcnum);
0166 }
0167 else
0168 {
0169
0170 NDRX_STRCPY_SAFE(svcnm, M_svcnm);
0171 }
0172
0173
0174 MUTEX_LOCK_V(M_wait_mutex);
0175 MUTEX_UNLOCK_V(M_wait_mutex);
0176
0177 ndrx_stopwatch_reset(&w);
0178
0179 while ((!M_fork && M_do_run) || (M_fork && ndrx_stopwatch_get_delta_sec(&w) < M_runtime))
0180 {
0181 if (M_prio!=NDRX_MSGPRIO_DEFAULT)
0182 {
0183 tpsprio(M_prio, TPABSOLUTE);
0184 }
0185
0186
0187 if (M_tran && EXSUCCEED!=tpbegin(M_tran, 0))
0188 {
0189 NDRX_LOG(log_error, "tpbegin() failed: %s",
0190 tpstrerror(tperrno));
0191 exit(-1);
0192 }
0193
0194 if (M_qspace[0])
0195 {
0196
0197 memset(&qc, 0, sizeof(qc));
0198 if (EXSUCCEED!=tpenqueue(M_qspace, svcnm, &qc, buf, 0, 0))
0199 {
0200 NDRX_LOG(log_error, "tpenqueue() failed %s diag: %d:%s",
0201 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0202 exit(-1);
0203 }
0204
0205 if (!M_autoq && !M_enqonly)
0206 {
0207
0208
0209 if (M_tran && EXSUCCEED!=tpcommit(0))
0210 {
0211 NDRX_LOG(log_error, "tpcommit() failed: %s",
0212 tpstrerror(tperrno));
0213 exit(-1);
0214 }
0215
0216 if (M_tran && EXSUCCEED!=tpbegin(M_tran, 0))
0217 {
0218 NDRX_LOG(log_error, "tpbegin() failed: %s",
0219 tpstrerror(tperrno));
0220 exit(-1);
0221 }
0222
0223
0224 memset(&qc, 0, sizeof(qc));
0225 rcv_buf=NULL;
0226 if (EXSUCCEED!=tpdequeue(M_qspace, svcnm, &qc, (char **)&rcv_buf, &rcvlen, 0))
0227 {
0228 NDRX_LOG(log_error, "tpdequeue() failed %s diag: %d:%s",
0229 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0230 exit(-1);
0231 }
0232 }
0233
0234 }
0235 else
0236 {
0237 rcv_buf=NULL;
0238 if (M_event_mode)
0239 {
0240 if (tppost(svcnm, buf, 0, 0)<=0)
0241 {
0242 NDRX_LOG(log_error, "Failed to post event [%s]: %s",
0243 svcnm, tpstrerror(tperrno));
0244 exit(-1);
0245 }
0246 }
0247 else if (EXFAIL==tpcall(svcnm, buf, 0, &rcv_buf, &rcvlen, 0))
0248 {
0249 NDRX_LOG(log_error, "Failed to call [%s]: %s",
0250 svcnm, tpstrerror(tperrno));
0251 exit(-1);
0252 }
0253 }
0254
0255 if (M_tran && EXSUCCEED!=tpcommit(0))
0256 {
0257 NDRX_LOG(log_error, "tpcommit() 2 failed: %s",
0258 tpstrerror(tperrno));
0259 exit(-1);
0260 }
0261
0262 if (NULL!=rcv_buf)
0263 {
0264 tpfree(rcv_buf);
0265 }
0266
0267
0268
0269
0270
0271
0272
0273 if (M_notify_mode)
0274 {
0275 while (M_notif_sent_acq<M_sent)
0276 {
0277 if (EXFAIL==ndrx_tpchkunsol(0))
0278 {
0279 if (tperrno!=TPETIME)
0280 {
0281 NDRX_LOG(log_error, "Failed to wait for notif: %s",
0282 tpstrerror(tperrno));
0283 exit(-1);
0284 }
0285 }
0286 }
0287 }
0288
0289 M_sent++;
0290
0291 if (M_numreq && M_sent >= M_numreq)
0292 {
0293 break;
0294 }
0295 }
0296
0297
0298 MUTEX_LOCK_V(M_wait_mutex);
0299 M_msg_sent+=M_sent;
0300 MUTEX_UNLOCK_V(M_wait_mutex);
0301
0302
0303
0304
0305
0306
0307
0308 if (M_qspace[0] && M_autoq)
0309 {
0310 int done = EXFALSE;
0311 do
0312 {
0313 memset(&qc, 0, sizeof(qc));
0314 qc.flags|=TPQPEEK;
0315 rcv_buf=NULL;
0316 if (EXSUCCEED!=tpdequeue(M_qspace, svcnm, &qc, (char **)&rcv_buf, &rcvlen, 0))
0317 {
0318 if (tperrno==TPEDIAGNOSTIC && QMENOMSG==qc.diagnostic)
0319 {
0320 done=EXTRUE;
0321 }
0322 else
0323 {
0324 NDRX_LOG(log_error, "tpdequeue() failed %s diag: %d:%s",
0325 tpstrerror(tperrno), qc.diagnostic, qc.diagmsg);
0326 exit(-1);
0327 }
0328 }
0329 else
0330 {
0331 if (NULL!=rcv_buf)
0332 {
0333 tpfree(rcv_buf);
0334 }
0335 sleep(1);
0336 }
0337
0338 } while (!done);
0339 }
0340
0341 out:
0342
0343 *p_finish_off=EXTRUE;
0344
0345
0346 if (NULL!=buf)
0347 {
0348 tpfree(buf);
0349 }
0350
0351 if (M_tran)
0352 {
0353 tpclose();
0354 }
0355
0356
0357 tpterm();
0358
0359 return;
0360
0361 }
0362
0363
0364
0365
0366
0367 expublic void usage(char *bin)
0368 {
0369 fprintf(stderr, "Usage: %s [options] -B buffer_type \n", bin);
0370 fprintf(stderr, "Options:\n");
0371 fprintf(stderr, " -p <prio> Call priority\n");
0372 fprintf(stderr, " -P Plot results, default false. Needs NDRX_BENCH_FILE and NDRX_BENCH_CONFIGNAME\n");
0373 fprintf(stderr, " -n <threadsnr> Number of threads, default 1\n");
0374 fprintf(stderr, " -s <svcnm> Service to call\n");
0375 fprintf(stderr, " -t <time> Number of seconds to run\n");
0376 fprintf(stderr, " -f <fldnm> Ubf field name to fill with random data\n");
0377 fprintf(stderr, " -b <data> Initial UBF data. In tpjsontoubf() format.\n");
0378 fprintf(stderr, " -S <size> Random data size, default 1024\n");
0379 fprintf(stderr, " -F Use forking instead of threading\n");
0380 fprintf(stderr, " -N <svcnum> Number of services\n");
0381 fprintf(stderr, " -Q <qspname> Persistent queue mode. Queue space name (thread enq+deq)\n");
0382 fprintf(stderr, " -A Auto queue testing (forwarding)\n");
0383 fprintf(stderr, " -E Persist only\n");
0384 fprintf(stderr, " -R <msgnum> Number of requests (time or nr first to stop)\n");
0385 fprintf(stderr, " -T <tout_sec> Initiate global transaction for XATMI calls\n");
0386 fprintf(stderr, " -I Wait for client notification\n");
0387 fprintf(stderr, " -e Post the event, -s serves as event\n");
0388 }
0389
0390
0391
0392
0393 expublic int main( int argc, char** argv )
0394 {
0395 int ret = EXSUCCEED;
0396 int c;
0397 long i;
0398 char *rnd_block = NULL;
0399 double spent;
0400 double tps;
0401 ndrx_stopwatch_t w;
0402 char run_ver[512];
0403 int parent=EXTRUE;
0404
0405
0406
0407
0408
0409
0410
0411
0412
0413
0414
0415
0416
0417
0418
0419
0420 M_buftype = ndrx_get_buffer_descr("UBF", NULL);
0421
0422 while ((c = getopt (argc, argv, "n:s:t:b:S:p:Pf:FN:Q:AER:T:Ie")) != -1)
0423 {
0424 switch (c)
0425 {
0426 case 'e':
0427 M_event_mode=EXTRUE;
0428 break;
0429 case 'A':
0430 M_autoq = EXTRUE;
0431 break;
0432 case 'R':
0433 M_numreq = atol(optarg);
0434 break;
0435 case 'E':
0436 M_enqonly = EXTRUE;
0437 break;
0438 case 'Q':
0439 NDRX_STRCPY_SAFE(M_qspace, optarg);
0440 break;
0441 case 'N':
0442 M_svcnum = atoi(optarg);
0443 break;
0444 case 'F':
0445 M_fork = EXTRUE;
0446 break;
0447 case 'p':
0448 M_prio = atoi(optarg);
0449 break;
0450 case 'P':
0451 M_doplot = EXTRUE;
0452 break;
0453 case 'n':
0454
0455 M_nr_threads=atoi(optarg);
0456 break;
0457 case 's':
0458
0459 NDRX_STRCPY_SAFE(M_svcnm, optarg);
0460 break;
0461 case 't':
0462 M_runtime = atoi(optarg);
0463 break;
0464 case 'b':
0465
0466 M_sample_data = NDRX_STRDUP(optarg);
0467
0468 if (NULL==M_sample_data)
0469 {
0470 NDRX_LOG(log_error, "Failed to copy data buffer: %s", strerror(errno));
0471 EXFAIL_OUT(ret);
0472 }
0473
0474 break;
0475 case 'f':
0476
0477 M_fld = Bfldid(optarg);
0478
0479 if (BBADFLDID==M_fld)
0480 {
0481 NDRX_LOG(log_error, "Failed to resolve field id [%s]: %s",
0482 optarg, Bstrerror(Berror));
0483 EXFAIL_OUT(ret);
0484 }
0485 break;
0486 case 'S':
0487 M_rndsize = atoi(optarg);
0488 break;
0489 case 'T':
0490 M_tran = atoi(optarg);
0491 break;
0492 case 'I':
0493 M_notify_mode = EXTRUE;
0494 break;
0495 default:
0496 NDRX_LOG(log_error, "Unknown option %c", c);
0497 usage(argv[0]);
0498 EXFAIL_OUT(ret);
0499 break;
0500 }
0501 }
0502
0503
0504
0505
0506 NDRX_LOG(log_info, "M_svcnm=[%s]", M_svcnm);
0507 NDRX_LOG(log_info, "M_runtime=%d", M_runtime);
0508 NDRX_LOG(log_info, "M_sample_data=[%s]", (M_sample_data?M_sample_data:"NULL"));
0509 NDRX_LOG(log_info, "M_fld=%ld", M_fld);
0510 NDRX_LOG(log_info, "M_rndsize=%d", M_rndsize);
0511 NDRX_LOG(log_info, "M_doplot=%d", M_doplot);
0512 NDRX_LOG(log_info, "M_prio=%d", M_prio);
0513 NDRX_LOG(log_info, "M_buftype=%p", M_buftype);
0514 NDRX_LOG(log_info, "M_nr_threads=%d", M_nr_threads);
0515 NDRX_LOG(log_info, "M_fork=%d", M_fork);
0516 NDRX_LOG(log_info, "M_svcnum=%d", M_svcnum);
0517 NDRX_LOG(log_info, "M_qspace=[%s]", M_qspace);
0518 NDRX_LOG(log_info, "M_autoq=[%d]", M_autoq);
0519 NDRX_LOG(log_info, "M_enqonly=[%d]", M_autoq);
0520 NDRX_LOG(log_info, "M_numreq=[%ld]", M_numreq);
0521 NDRX_LOG(log_info, "M_tran=[%d]", M_tran);
0522 NDRX_LOG(log_info, "M_notify_mode=[%d]", M_notify_mode);
0523 NDRX_LOG(log_info, "M_event_mode=[%d]", M_event_mode);
0524
0525
0526
0527 if (NULL==M_buftype)
0528 {
0529 NDRX_LOG(log_error, "Invalid buffer specified (or not specified)");
0530 usage(argv[0]);
0531 EXFAIL_OUT(ret);
0532 }
0533
0534 M_master_buf = tpalloc(M_buftype->type, NULL, 1024 + M_rndsize*2);
0535
0536 if (NULL==M_master_buf)
0537 {
0538 NDRX_LOG(log_error, "Failed to allocate send buffer: %s", tpstrerror(tperrno));
0539 EXFAIL_OUT(ret);
0540 }
0541
0542
0543
0544 if (0==strcmp(M_buftype->type, "UBF"))
0545 {
0546 if (NULL!=M_sample_data)
0547 {
0548 if (EXSUCCEED!=tpjsontoubf((UBFH *)M_master_buf, M_sample_data))
0549 {
0550 NDRX_LOG(log_error, "Failed to parse call data: %s",
0551 tpstrerror(tperrno));
0552 EXFAIL_OUT(ret);
0553 }
0554 }
0555
0556
0557 rnd_block = NDRX_MALLOC(M_rndsize);
0558
0559 if (NULL==rnd_block)
0560 {
0561 NDRX_LOG(log_error, "Failed to malloc random block: %s",
0562 strerror(errno));
0563 EXFAIL_OUT(ret);
0564 }
0565
0566
0567 if (BBADFLDID!=M_fld)
0568 {
0569 NDRX_LOG(log_debug, "Adding random block to %s of %d", Bfname(M_fld), M_rndsize);
0570 if (EXSUCCEED!=Bchg((UBFH *)M_master_buf, M_fld, 0, rnd_block, M_rndsize))
0571 {
0572 NDRX_LOG(log_error, "Failed to add random block: %s",
0573 Bstrerror(Berror));
0574 EXFAIL_OUT(ret);
0575 }
0576 }
0577
0578 M_msgsize=Bused((UBFH *)M_master_buf);
0579 }
0580 else if (M_notify_mode)
0581 {
0582 NDRX_LOG(log_error, "Notify (-I) mode only supported on UBF buffers!");
0583 EXFAIL_OUT(ret);
0584 }
0585
0586 if (!getenv("NDRX_BENCH_FILE"))
0587 {
0588 setenv("NDRX_BENCH_FILE", "test.out", EXTRUE);
0589 }
0590
0591 if (!getenv("NDRX_BENCH_CONFIGNAME"))
0592 {
0593 snprintf(run_ver, sizeof(run_ver), "test");
0594 setenv("NDRX_BENCH_CONFIGNAME", run_ver, EXTRUE);
0595 }
0596
0597
0598 tpterm();
0599
0600 if (M_fork)
0601 {
0602 signal(SIGCHLD, SIG_IGN);
0603
0604 NDRX_LOG(log_debug, "Fork mode");
0605
0606 if (EXSUCCEED!=pipe ( M_fd ))
0607 {
0608 NDRX_LOG(log_error, "Failed to pipe: %s", strerror(errno));
0609 EXFAIL_OUT(ret);
0610 }
0611
0612 for (i=0; i<M_nr_threads; i++)
0613 {
0614 if ( ndrx_fork () == 0 )
0615 {
0616 int finish=EXFALSE;
0617 parent=EXFALSE;
0618
0619 sleep(1);
0620
0621 ndrx_stopwatch_reset(&w);
0622
0623
0624 thread_process((void *)i, &finish);
0625
0626 spent=ndrx_stopwatch_get_delta(&w);
0627
0628 tps = ((double)M_msg_sent / (spent / 1000));
0629 NDRX_LOG(log_debug, "Spent: %ld ms msgs: %ld tps: %lf",
0630 spent, M_msg_sent, tps);
0631
0632 if (sizeof(tps)!=write (M_fd[NDRX_WRITE], (char *)&tps, sizeof(tps)))
0633 {
0634 NDRX_LOG(log_error, "Failed to write to pipe: %s",
0635 strerror(errno));
0636 userlog("Failed to write to pipe: %s",
0637 strerror(errno));
0638 EXFAIL_OUT(ret);
0639 }
0640
0641
0642 goto out;
0643
0644 }
0645 }
0646
0647
0648 for (i=0; i<M_nr_threads; i++)
0649 {
0650 double tpsproc=0;
0651 if (sizeof(tpsproc)!=read ( M_fd[NDRX_READ], (char *)&tpsproc, sizeof(tpsproc)))
0652 {
0653 NDRX_LOG(log_error, "Failed to read result %d: %s", strerror(errno));
0654 EXFAIL_OUT(ret);
0655 }
0656
0657 tps+=tpsproc;
0658 }
0659 }
0660 else
0661 {
0662
0663 NDRX_LOG(log_debug, "Thread mode");
0664 M_threads = ndrx_thpool_init(M_nr_threads, &ret, NULL, NULL, 0, NULL);
0665
0666 if (EXSUCCEED!=ret)
0667 {
0668 NDRX_LOG(log_error, "Thread pool init failure");
0669 EXFAIL_OUT(ret);
0670 }
0671
0672
0673 MUTEX_LOCK_V(M_wait_mutex);
0674
0675 for (i=0; i<M_nr_threads; i++)
0676 {
0677
0678 ndrx_thpool_add_work(M_threads, (void*)thread_process, (void *)i);
0679 }
0680
0681
0682 sleep(2);
0683
0684 MUTEX_UNLOCK_V(M_wait_mutex);
0685 ndrx_stopwatch_reset(&w);
0686
0687
0688 while (ndrx_stopwatch_get_delta_sec(&w) < M_runtime)
0689 {
0690
0691
0692
0693 sleep(1);
0694
0695 if (M_nr_threads==ndrx_thpool_nr_not_working(M_threads))
0696 {
0697 NDRX_LOG(log_debug, "All threads did exit.");
0698 break;
0699 }
0700 }
0701 M_do_run=EXFALSE;
0702
0703
0704 ndrx_thpool_wait(M_threads);
0705 spent=ndrx_stopwatch_get_delta(&w);
0706
0707 tps = ((double)M_msg_sent / (spent / 1000));
0708
0709 NDRX_LOG(log_debug, "Spent: %ld ms msgs: %ld tps: %lf",
0710 spent, M_msg_sent, tps);
0711
0712 ndrx_thpool_destroy(M_threads);
0713 }
0714
0715
0716 if (M_doplot)
0717 {
0718 ndrx_bench_write_stats(M_msgsize, tps);
0719 }
0720
0721 out:
0722 if (EXFAIL!=M_fd[NDRX_READ])
0723 {
0724 close(M_fd[NDRX_READ]);
0725 }
0726
0727 if (EXFAIL!=M_fd[NDRX_WRITE])
0728 {
0729 close(M_fd[NDRX_WRITE]);
0730 }
0731
0732 if (NULL!=M_master_buf)
0733 {
0734 tpfree((char *)M_master_buf);
0735 }
0736
0737 if (NULL!=M_sample_data)
0738 {
0739 NDRX_FREE(M_sample_data);
0740 }
0741
0742 if (NULL!=rnd_block)
0743 {
0744 NDRX_FREE(rnd_block);
0745 }
0746
0747 return ret;
0748
0749 }
0750
0751