Back to home page

Enduro/X

 
 

    


0001 /**
0002  * @brief Simple message client
0003  *
0004  * @file atmiclt0_mqcl.c
0005  */
0006 /* -----------------------------------------------------------------------------
0007  * Enduro/X Middleware Platform for Distributed Transaction Processing
0008  * Copyright (C) 2009-2016, ATR Baltic, Ltd. All Rights Reserved.
0009  * Copyright (C) 2017-2023, Mavimax, Ltd. All Rights Reserved.
0010  * This software is released under one of the following licenses:
0011  * AGPL (with Java and Go exceptions) or Mavimax's license for commercial use.
0012  * See LICENSE file for full text.
0013  * -----------------------------------------------------------------------------
0014  * AGPL license:
0015  *
0016  * This program is free software; you can redistribute it and/or modify it under
0017  * the terms of the GNU Affero General Public License, version 3 as published
0018  * by the Free Software Foundation;
0019  *
0020  * This program is distributed in the hope that it will be useful, but WITHOUT ANY
0021  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
0022  * PARTICULAR PURPOSE. See the GNU Affero General Public License, version 3
0023  * for more details.
0024  *
0025  * You should have received a copy of the GNU Affero General Public License along 
0026  * with this program; if not, write to the Free Software Foundation, Inc.,
0027  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0028  *
0029  * -----------------------------------------------------------------------------
0030  * A commercial use license is available from Mavimax, Ltd
0031  * contact@mavimax.com
0032  * -----------------------------------------------------------------------------
0033  */
0034 #include <string.h>
0035 #include <stdio.h>
0036 #include <stdlib.h>
0037 #include <memory.h>
0038 #include <pthread.h>
0039 #include <unistd.h>
0040 #include <fcntl.h>
0041 #include <sys_mqueue.h>
0042 #include "test000.h"
0043 #include "sys_unix.h"
0044 #include <ndrstandard.h>
0045 #include <ndebug.h>
0046 #include <errno.h>
0047 #include <nstopwatch.h>
0048 
0049 /*---------------------------Externs------------------------------------*/
0050 /*---------------------------Macros-------------------------------------*/
0051 #define ATTRCMP(X, Y) X.mq_curmsgs == Y.mq_curmsgs && X.mq_flags == Y.mq_flags && \
0052         X.mq_maxmsg == Y.mq_maxmsg && X.mq_msgsize==Y.mq_msgsize
0053 /*---------------------------Enums--------------------------------------*/
0054 /*---------------------------Typedefs-----------------------------------*/
0055 /*---------------------------Globals------------------------------------*/
0056 int M_ok = 0;
0057 MUTEX_LOCKDECL(M_ok_lock);
0058 /*---------------------------Statics------------------------------------*/
0059 /*---------------------------Prototypes---------------------------------*/
0060 
0061 /**
0062  * Test exclusive access to queue
0063  * @param pfx test prefix
0064  * @return EXSUCCEED/EXFAIL
0065  */
0066 int local_test_exlc(char *pfx)
0067 {
0068     int ret = EXSUCCEED;
0069     char qstr[128];
0070     struct mq_attr attr;
0071     int err;
0072     
0073     snprintf(qstr, sizeof(qstr), "/%s,test000,clt", pfx);
0074     
0075     attr.mq_flags = 0;
0076     attr.mq_maxmsg = 10;
0077     attr.mq_msgsize = TEST_REPLY_SIZE;
0078     attr.mq_curmsgs = 0;
0079     
0080     mqd_t mq1 = (mqd_t)EXFAIL;
0081     mqd_t mq2 = (mqd_t)EXFAIL;
0082     
0083     if ((mqd_t)EXFAIL==(mq1 = ndrx_mq_open(qstr, O_CREAT | O_EXCL, 0644, &attr)))
0084     {
0085         NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0086                 qstr, strerror(errno));
0087         EXFAIL_OUT(ret);
0088     }
0089     
0090     /* try open second time should fail! */
0091     
0092     if ((mqd_t)EXFAIL!=(mq2 = ndrx_mq_open(qstr, O_CREAT | O_EXCL, 0644, &attr)))
0093     {
0094         NDRX_LOG(log_error, "Second time open must fail!: [%s]: %s", 
0095                 qstr, strerror(errno));
0096         EXFAIL_OUT(ret);
0097     }
0098     err = errno;
0099     
0100     if (EEXIST!=err)
0101     {
0102         NDRX_LOG(log_error, "Unit test failed: expected error %d (EEXIST) got %d", 
0103                 EEXIST, err);
0104         EXFAIL_OUT(ret);
0105     }
0106 
0107 out:
0108     
0109     if (EXSUCCEED!=ndrx_mq_close(mq1))
0110     {
0111         NDRX_LOG(log_error, "Failed to close %p: %s", mq1, strerror(errno));
0112         ret=EXFAIL;
0113     }
0114 
0115     /* something does not work right here! */
0116     if (EXSUCCEED!=ndrx_mq_unlink(qstr))
0117     {
0118         NDRX_LOG(log_error, "Failed to unlink [%p]: %s", qstr, strerror(errno));
0119         ret=EXFAIL;
0120     }
0121 
0122     NDRX_LOG(log_error, "%s returns %d", __func__, ret);
0123     
0124     return ret;
0125 }
0126 
0127 /**
0128  * Test exclusive access to queue
0129  * @param pfx test prefix
0130  * @return EXSUCCEED/EXFAIL
0131  */
0132 int local_test_unlink(char *pfx)
0133 {
0134     int ret = EXSUCCEED;
0135     char qstr[128];
0136     struct mq_attr attr;
0137     char buffer[TEST_REPLY_SIZE];
0138     int i;
0139     mqd_t mq1 = (mqd_t)EXFAIL;
0140     
0141     snprintf(qstr, sizeof(qstr), "/%s,test000,clt,unl", pfx);
0142     
0143     attr.mq_flags = 0;
0144     attr.mq_maxmsg = 10;
0145     attr.mq_msgsize = TEST_REPLY_SIZE;
0146     attr.mq_curmsgs = 0;
0147     
0148     for (i=0; i<1000; i++)
0149     {
0150         if ((mqd_t)EXFAIL==(mq1 = ndrx_mq_open(qstr, O_CREAT | O_RDWR, 0644, &attr)))
0151         {
0152             NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0153                     qstr, strerror(errno));
0154             EXFAIL_OUT(ret);
0155         }
0156 
0157         /* send one block */
0158         if (EXFAIL==ndrx_mq_send(mq1, buffer, 
0159                     TEST_REPLY_SIZE, 0))
0160         {
0161             NDRX_LOG(log_error, "Failed to send message: %s", strerror(errno));
0162             EXFAIL_OUT(ret);
0163         }
0164 
0165         if (EXSUCCEED!=ndrx_mq_close(mq1))
0166         {
0167             NDRX_LOG(log_error, "Failed to close %p: %s", mq1, strerror(errno));
0168             ret=EXFAIL;
0169         }
0170 
0171         if (EXSUCCEED!=ndrx_mq_unlink(qstr))
0172         {
0173             NDRX_LOG(log_error, "Failed to unlink [%s]: %s", qstr, strerror(errno));
0174             ret=EXFAIL;
0175         }
0176     }
0177     
0178 out:
0179 
0180     NDRX_LOG(log_error, "%s returns %d", __func__, ret);
0181     
0182     return ret;
0183 }
0184 
0185 /**
0186  * Test non existing queue open with out create
0187  * @param pfx test prefix
0188  * @return EXSUCCEED/EXFAIL
0189  */
0190 int local_test_nonexists(char *pfx)
0191 {
0192     int ret = EXSUCCEED;
0193     char qstr[128];
0194     struct mq_attr attr;
0195     int i;
0196     mqd_t mq1 = (mqd_t)EXFAIL;
0197     int err;
0198     
0199     snprintf(qstr, sizeof(qstr), "/%s,test000,clt,none", pfx);
0200     
0201     attr.mq_flags = 0;
0202     attr.mq_maxmsg = 10;
0203     attr.mq_msgsize = TEST_REPLY_SIZE;
0204     attr.mq_curmsgs = 0;
0205     
0206     for (i=0; i<1000; i++)
0207     {
0208         if ((mqd_t)EXFAIL!=(mq1 = ndrx_mq_open(qstr, 0, 0644, &attr)))
0209         {
0210             NDRX_LOG(log_error, "Queue opened for some reason but shall not: [%s]", 
0211                     qstr);
0212             EXFAIL_OUT(ret);
0213         }
0214         err = errno;
0215         
0216         if (err!=ENOENT)
0217         {
0218             NDRX_LOG(log_error, "Expected error %d (ENOENT) got %d", ENOENT, err);
0219             EXFAIL_OUT(ret);
0220         }
0221     }
0222     
0223 out:
0224 
0225     NDRX_LOG(log_error, "%s returns %d", __func__, ret);
0226     
0227     return ret;
0228 }
0229 
0230 /**
0231  * Receive tests with different modes
0232  * @param pfx queue prefix
0233  * @return EXSUCCEED/EXFAIL
0234  */
0235 int local_test_receive(char *pfx)
0236 {
0237     int ret = EXSUCCEED;
0238     struct mq_attr attr, attrnew, attrold;
0239     char buffer[TEST_REPLY_SIZE];
0240     struct   timespec tm;
0241     int i;
0242     char qstr[128];
0243     mqd_t mq = (mqd_t)EXFAIL;
0244     int err;
0245     ssize_t bytes_read;
0246     ndrx_stopwatch_t t;
0247     int tim;
0248     
0249     snprintf(qstr, sizeof(qstr), "/%s,test000,clt,rcv", pfx);
0250     
0251     for (i=0; i<4; i++)
0252     {
0253         /* initialize the queue attributes */
0254         attr.mq_flags = 0;
0255         attr.mq_maxmsg = 10;
0256         attr.mq_msgsize = TEST_REPLY_SIZE;
0257         attr.mq_curmsgs = 0;
0258 
0259         /* create the message queue */
0260         if ((mqd_t)EXFAIL==(mq = ndrx_mq_open(qstr, O_CREAT, 0644, &attr)))
0261         {
0262             NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0263                     SV_QUEUE_NAME, strerror(errno));
0264             EXFAIL_OUT(ret);
0265         }
0266 
0267         NDRX_LOG(log_debug, ">>> receive: timed + blocked");
0268 
0269         ndrx_stopwatch_reset(&t);
0270         /* receive the message 
0271          * Maybe have some timed receive
0272          */
0273         clock_gettime(CLOCK_REALTIME, &tm);
0274         tm.tv_sec += 2;  /* Set for 20 seconds */
0275 
0276         if (EXSUCCEED==(bytes_read=ndrx_mq_timedreceive(mq, buffer, 
0277                 TEST_REPLY_SIZE, NULL, &tm)))
0278         {
0279             NDRX_LOG(log_error, "Got message at len %d but expected error!", bytes_read);
0280             EXFAIL_OUT(ret);
0281         }
0282 
0283         /* the error shall be timeout... */
0284 
0285         err = errno;
0286 
0287         if (ETIMEDOUT!=err)
0288         {
0289             NDRX_LOG(log_error, "Expected %d (ETIMEDOUT) error but got %d", ETIMEDOUT, err);
0290             EXFAIL_OUT(ret);
0291         }
0292 
0293         /* test the timeout */
0294         tim = ndrx_stopwatch_get_delta_sec(&t);
0295 
0296         if (tim<2 ||tim > 3)
0297         {
0298             NDRX_LOG(log_error, "Expected timeout 2 spent %d", tim);
0299             EXFAIL_OUT(ret);
0300         }
0301 
0302         NDRX_LOG(log_debug, ">>> receive: timed + non blocked");
0303 
0304         /* we should get EAGAIN and time shall be less than second */
0305         memcpy(&attrnew, &attr, sizeof(attr));
0306 
0307         attrnew.mq_flags = O_NONBLOCK;
0308 
0309         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attrnew, &attrold))
0310         {
0311             NDRX_LOG(log_error, "Failed to set new attr: %s", strerror(errno));
0312             EXFAIL_OUT(ret);
0313         }
0314 
0315         /* compare the attribs with initial, must match */
0316 
0317         NDRX_LOG(log_debug, "oattr: mq_curmsgs=%ld mq_flags=%ld mq_maxmsg=%ld, mq_msgsize=%ld", 
0318                 attrold.mq_curmsgs, attrold.mq_flags, attrold.mq_maxmsg, attrold.mq_msgsize);
0319         
0320         NDRX_LOG(log_debug, "nattr: mq_curmsgs=%ld mq_flags=%ld mq_maxmsg=%ld, mq_msgsize=%ld", 
0321                 attr.mq_curmsgs, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize);
0322 
0323         if (!(ATTRCMP(attr, attrold)))
0324         {
0325             NDRX_LOG(log_error, "Org attrs does not match!");
0326             EXFAIL_OUT(ret);
0327         }
0328 
0329         clock_gettime(CLOCK_REALTIME, &tm);
0330         tm.tv_sec += 2;  /* Set for 20 seconds */
0331         ndrx_stopwatch_reset(&t);
0332 
0333         if (EXSUCCEED==(bytes_read=ndrx_mq_timedreceive(mq, buffer, 
0334                 TEST_REPLY_SIZE, NULL, &tm)))
0335         {
0336             NDRX_LOG(log_error, "Got message at len %d but expected error!", 
0337                     bytes_read);
0338             EXFAIL_OUT(ret);
0339         }
0340 
0341         /* the error shall be timeout... */
0342 
0343         err = errno;
0344 
0345         if (EAGAIN!=err)
0346         {
0347             NDRX_LOG(log_error, "Expected %d (EAGAIN) error but got %d", EAGAIN, err);
0348             EXFAIL_OUT(ret);
0349         }
0350 
0351         /* test the timeout */
0352         tim = ndrx_stopwatch_get_delta_sec(&t);
0353 
0354         if (0!=tim)
0355         {
0356             NDRX_LOG(log_error, "Expected spent 0 as non blocked q, but got %d", 
0357                     tim);
0358             EXFAIL_OUT(ret);
0359         }
0360 
0361         NDRX_LOG(log_debug, ">>> receive: non timed + non blocked");
0362 
0363         if (EXSUCCEED==(bytes_read=ndrx_mq_receive(mq, buffer, 
0364                 TEST_REPLY_SIZE, NULL)))
0365         {
0366             NDRX_LOG(log_error, "Got message with len %d but expected error!", 
0367                     bytes_read);
0368             EXFAIL_OUT(ret);
0369         }
0370 
0371         /* the error shall be timeout... */
0372 
0373         err = errno;
0374 
0375         if (EAGAIN!=err)
0376         {
0377             NDRX_LOG(log_error, "Expected %d (EAGAIN) error but got %d", 
0378                     EAGAIN, err);
0379             EXFAIL_OUT(ret);
0380         }
0381 
0382         /* cleanup */
0383 
0384         if ((mqd_t)EXFAIL!=mq && EXFAIL==ndrx_mq_close(mq))
0385         {
0386             NDRX_LOG(log_error, "Failed to close queue: %s", strerror(errno));
0387             ret=EXFAIL;
0388 
0389         }
0390 
0391         if (EXSUCCEED!=ndrx_mq_unlink(qstr))
0392         {
0393             NDRX_LOG(log_error, "Failed to unlink [%p]: %s", 
0394                     qstr, strerror(errno));
0395             ret=EXFAIL;
0396         }
0397     }
0398 
0399 out:
0400     return ret;
0401 
0402 }
0403 
0404 /**
0405  * Send tests with different modes
0406  * @param pfx queue prefix
0407  * @return EXSUCCEED/EXFAIL
0408  */
0409 int local_test_send(char *pfx)
0410 {
0411     int ret = EXSUCCEED;
0412     struct mq_attr attr, attrnew, attrold;
0413     char buffer[TEST_REPLY_SIZE];
0414     struct   timespec tm;
0415     int i;
0416     char qstr[128];
0417     mqd_t mq = (mqd_t)EXFAIL;
0418     int err;
0419     ndrx_stopwatch_t t;
0420     int tim;
0421     
0422     snprintf(qstr, sizeof(qstr), "/%s,test000,clt,snd", pfx);
0423     /* unlink the queue if something left from pervious tests... */
0424     ndrx_mq_unlink(qstr);
0425     
0426     for (i=0; i<4; i++)
0427     {
0428         /* initialize the queue attributes */
0429         attr.mq_flags = 0;
0430         attr.mq_maxmsg = 10;
0431         attr.mq_msgsize = TEST_REPLY_SIZE;
0432         attr.mq_curmsgs = 0;
0433 
0434         /* create the message queue */
0435         if ((mqd_t)EXFAIL==(mq = ndrx_mq_open(qstr, O_CREAT | O_RDWR, 0644, &attr)))
0436         {
0437             NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0438                     SV_QUEUE_NAME, strerror(errno));
0439             EXFAIL_OUT(ret);
0440         }
0441 
0442         NDRX_LOG(log_debug, ">>> send: timed + blocked - ok (first msg)");
0443 
0444         ndrx_stopwatch_reset(&t);
0445         /* send the message 
0446          * Maybe have some timed receive
0447          */
0448         clock_gettime(CLOCK_REALTIME, &tm);
0449         tm.tv_sec += 2;  /* Set for 2 seconds */
0450 
0451         if (EXSUCCEED!=ndrx_mq_timedsend(mq, buffer, 
0452                 TEST_REPLY_SIZE, 0, &tm))
0453         {
0454             NDRX_LOG(log_error, "Failed to send 1: %s!", strerror(errno));
0455             EXFAIL_OUT(ret);
0456         }
0457         
0458         /* test the timeout */
0459         tim = ndrx_stopwatch_get_delta_sec(&t);
0460 
0461         if (0!=tim)
0462         {
0463             NDRX_LOG(log_error, "Expected send time 0, but got: %d", tim);
0464             EXFAIL_OUT(ret);
0465         }
0466         
0467         NDRX_LOG(log_debug, ">>> send: timed + non blocked - ok (second msg)");
0468 
0469         /* we should get EAGAIN and time shall be less than second */
0470         memcpy(&attrnew, &attr, sizeof(attr));
0471 
0472         attrnew.mq_flags = O_NONBLOCK;
0473 
0474         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attrnew, NULL))
0475         {
0476             NDRX_LOG(log_error, "Failed to set new attr: %s", strerror(errno));
0477             EXFAIL_OUT(ret);
0478         }
0479 
0480         clock_gettime(CLOCK_REALTIME, &tm);
0481         tm.tv_sec += 2;  /* Set for 20 seconds */
0482         ndrx_stopwatch_reset(&t);
0483 
0484         if (EXSUCCEED!=ndrx_mq_timedsend(mq, buffer, 
0485                 TEST_REPLY_SIZE, 0, &tm))
0486         {
0487             NDRX_LOG(log_error, "Failed to send 2: %s!", strerror(errno));
0488             EXFAIL_OUT(ret);
0489         }
0490         
0491         /* test the timeout */
0492         tim = ndrx_stopwatch_get_delta_sec(&t);
0493 
0494         if (0!=tim)
0495         {
0496             NDRX_LOG(log_error, "Expected spent 0 as non blocked q, but got %d", 
0497                     tim);
0498             EXFAIL_OUT(ret);
0499         }
0500 
0501         NDRX_LOG(log_debug, ">>> send: non timed + blocked - ok (third msg)");
0502 
0503         attrnew.mq_flags = 0;
0504 
0505         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attrnew, NULL))
0506         {
0507             NDRX_LOG(log_error, "Failed to set new attr 2: %s", strerror(errno));
0508             EXFAIL_OUT(ret);
0509         }
0510         
0511         if (EXSUCCEED!=ndrx_mq_send(mq, buffer, 
0512                 TEST_REPLY_SIZE, 0))
0513         {
0514             NDRX_LOG(log_error, "Failed to send 3: %s!", strerror(errno));
0515             EXFAIL_OUT(ret);
0516         }
0517 
0518         NDRX_LOG(log_debug, ">>> send: non timed + non blocked - ok (forth msg)");
0519 
0520         attrnew.mq_flags = O_NONBLOCK;
0521 
0522         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attrnew, NULL))
0523         {
0524             NDRX_LOG(log_error, "Failed to set new attr 2: %s", strerror(errno));
0525             EXFAIL_OUT(ret);
0526         }
0527         
0528         if (EXSUCCEED!=ndrx_mq_send(mq, buffer, 
0529                 TEST_REPLY_SIZE, 0))
0530         {
0531             NDRX_LOG(log_error, "Failed to send 4: %s!", strerror(errno));
0532             EXFAIL_OUT(ret);
0533         }
0534         
0535         NDRX_LOG(log_debug, ">>> Test queue attributes...");
0536         memset(&attrold, 0, sizeof(attrold));
0537         if (EXSUCCEED!=ndrx_mq_getattr(mq, &attrold))
0538         {
0539             NDRX_LOG(log_error, "Failed to get queue attribs: %s", strerror(errno));
0540             EXFAIL_OUT(ret);
0541         }
0542         
0543         /* there must be 4 msgs */
0544         if (4!=attrold.mq_curmsgs)
0545         {
0546             NDRX_LOG(log_error, "Expected 4 msgs on queue but got: %d", attrold.mq_curmsgs);
0547             EXFAIL_OUT(ret);
0548         }
0549         
0550         if (10!=attrold.mq_maxmsg)
0551         {
0552             NDRX_LOG(log_error, "Expected maxmsg 10 but got %d", attrold.mq_maxmsg);
0553             EXFAIL_OUT(ret);
0554         }
0555         
0556         if (TEST_REPLY_SIZE!=attrold.mq_msgsize)
0557         {
0558             NDRX_LOG(log_error, "Expected msgsize %d but got %d", 
0559                     TEST_REPLY_SIZE, attrold.mq_msgsize);
0560             EXFAIL_OUT(ret);
0561         }
0562         
0563         /* cleanup */
0564         if ((mqd_t)EXFAIL!=mq && EXFAIL==ndrx_mq_close(mq))
0565         {
0566             NDRX_LOG(log_error, "Failed to close queue: %s", strerror(errno));
0567             ret=EXFAIL;
0568 
0569         }
0570 
0571         if (EXSUCCEED!=ndrx_mq_unlink(qstr))
0572         {
0573             NDRX_LOG(log_error, "Failed to unlink [%p]: %s", 
0574                     qstr, strerror(errno));
0575             ret=EXFAIL;
0576         }
0577     }
0578 
0579 out:
0580     return ret;
0581 
0582 }
0583 
0584 /**
0585  * Test what happens if queue is full
0586  * @param pfx queue prefix
0587  * @return EXSUCCEED/EXFAIL
0588  */
0589 int local_test_qfull(char *pfx)
0590 {
0591     int ret = EXSUCCEED;
0592     struct mq_attr attr;
0593     char buffer[TEST_REPLY_SIZE];
0594     struct   timespec tm;
0595     int i;
0596     char qstr[128];
0597     mqd_t mq = (mqd_t)EXFAIL;
0598     int err;
0599     ndrx_stopwatch_t t;
0600     int tim;
0601     
0602     snprintf(qstr, sizeof(qstr), "/%s,test000,clt,full", pfx);
0603     /* unlink the queue if something left from pervious tests... */
0604     ndrx_mq_unlink(qstr);
0605     
0606     for (i=0; i<4; i++)
0607     {
0608         /* initialize the queue attributes */
0609         attr.mq_flags = O_NONBLOCK;
0610         /* this does not matter, for system v the kernel config dictates the limit */
0611         attr.mq_maxmsg = 10;
0612         attr.mq_msgsize = TEST_REPLY_SIZE;
0613         attr.mq_curmsgs = 0;
0614 
0615         /* create the message queue */
0616         if ((mqd_t)EXFAIL==(mq = ndrx_mq_open(qstr, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr)))
0617         {
0618             NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0619                     SV_QUEUE_NAME, strerror(errno));
0620             EXFAIL_OUT(ret);
0621         }
0622         
0623         /* fill it up... */
0624         
0625         NDRX_LOG(log_info, ">>> send: timed + non blocked fill up the queue, "
0626                 "will get EAGAIN in non timeout period (shorter)");
0627         
0628         while (EXSUCCEED==ndrx_mq_send(mq, buffer, 
0629                 TEST_REPLY_SIZE, 0))
0630         {
0631             NDRX_LOG(log_debug, "msg sent...");
0632         }
0633         err = errno;
0634         
0635         /* now it test the error code */
0636         if (EAGAIN!=err)
0637         {
0638             NDRX_LOG(log_error, "Expected error %d (EAGAIN) but got %d",
0639                     EAGAIN, err);
0640             EXFAIL_OUT(ret);
0641         }
0642         
0643         /* test the blocked mode, should get some timeout... */
0644         NDRX_LOG(log_debug, ">>> send: timed + blocked fill up the queue, "
0645                 "will get timeout");
0646         
0647         /* switch to blocked */
0648         attr.mq_flags = 0;
0649         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attr, NULL))
0650         {
0651             NDRX_LOG(log_error, "Failed to set new attr 1: %s", strerror(errno));
0652             EXFAIL_OUT(ret);
0653         }
0654 
0655         ndrx_stopwatch_reset(&t);
0656         /* receive the message 
0657          * Maybe have some timed receive
0658          */
0659         clock_gettime(CLOCK_REALTIME, &tm);
0660         tm.tv_sec += 2;  /* Set for 20 seconds */
0661 
0662         if (EXSUCCEED==ndrx_mq_timedsend(mq, buffer, 
0663                 TEST_REPLY_SIZE, 0, &tm))
0664         {
0665             NDRX_LOG(log_error, "The queue is full but msg sent for some error reason!");
0666             EXFAIL_OUT(ret);
0667         }
0668         err = errno;
0669         NDRX_LOG(log_error, "got err: %s", strerror(errno));
0670         /* now it test the error code */
0671         if (ETIMEDOUT!=err)
0672         {
0673             NDRX_LOG(log_error, "Expected error %d (EAGAIN) but got %d",
0674                     ETIMEDOUT, err);
0675             EXFAIL_OUT(ret);
0676         }
0677         
0678         /* test the timeout */
0679         tim = ndrx_stopwatch_get_delta_sec(&t);
0680 
0681         if (tim < 2 || tim > 20)
0682         {
0683             NDRX_LOG(log_error, "Expected send time atleast 2 and less "
0684         "than 20, but got: %d", tim);
0685             EXFAIL_OUT(ret);
0686         }
0687 
0688         NDRX_LOG(log_debug, ">>> send: non timed + non blocked fill "
0689                 "up the queue, will EAGAIN");
0690         
0691         
0692         attr.mq_flags = O_NONBLOCK;
0693         if (EXSUCCEED!=ndrx_mq_setattr(mq, &attr, NULL))
0694         {
0695             NDRX_LOG(log_error, "Failed to set new attr 2: %s", strerror(errno));
0696             EXFAIL_OUT(ret);
0697         }
0698         
0699         if (EXSUCCEED==ndrx_mq_send(mq, buffer, 
0700                 TEST_REPLY_SIZE, 0))
0701         {
0702             NDRX_LOG(log_error, "Sending shall fail, but was ok!");
0703             EXFAIL_OUT(ret);
0704         }
0705         
0706         err = errno;
0707         
0708         /* now it test the error code */
0709         if (EAGAIN!=err)
0710         {
0711             NDRX_LOG(log_error, "Expected error %d (EAGAIN) but got %d",
0712                     EAGAIN, err);
0713             EXFAIL_OUT(ret);
0714         }
0715         
0716         /* cleanup */
0717         if ((mqd_t)EXFAIL!=mq && EXFAIL==ndrx_mq_close(mq))
0718         {
0719             NDRX_LOG(log_error, "Failed to close queue: %s", strerror(errno));
0720             ret=EXFAIL;
0721 
0722         }
0723 
0724         if (EXSUCCEED!=ndrx_mq_unlink(qstr))
0725         {
0726             NDRX_LOG(log_error, "Failed to unlink [%p]: %s", 
0727                     qstr, strerror(errno));
0728             ret=EXFAIL;
0729         }
0730     }
0731 
0732 out:
0733     return ret;
0734 
0735 }
0736 
0737 /**
0738  * Perform local tests, we will run in threads too. so that we see that
0739  * threaded mode is ok for our queues.
0740  */
0741 void *local_test(void *vargp) 
0742 {
0743     int ret = EXSUCCEED;
0744     
0745     NDRX_LOG(log_info, "create queue + try exclusive access - shall fail properly");
0746     if (EXSUCCEED!=local_test_exlc((char *)vargp))
0747     {
0748         EXFAIL_OUT(ret);
0749     }
0750     
0751     NDRX_LOG(log_info, "Test delayed unlink...");
0752     
0753     if (EXSUCCEED!=local_test_unlink((char *)vargp))
0754     {
0755         EXFAIL_OUT(ret);
0756     }
0757     
0758     NDRX_LOG(log_info, "test open of non existing queue in not create mode");
0759     
0760     if (EXSUCCEED!=local_test_nonexists((char *)vargp))
0761     {
0762         EXFAIL_OUT(ret);
0763     }
0764     
0765     /* receive: timed + blocked */
0766     
0767     if (EXSUCCEED!=local_test_receive((char *)vargp))
0768     {
0769         EXFAIL_OUT(ret);
0770     }
0771     
0772     if (EXSUCCEED!=local_test_send((char *)vargp))
0773     {
0774         EXFAIL_OUT(ret);
0775     }
0776     
0777     if (EXSUCCEED!=local_test_qfull((char *)vargp))
0778     {
0779         EXFAIL_OUT(ret);
0780     }
0781 
0782 out:
0783     if (EXSUCCEED==ret)
0784     {
0785         MUTEX_LOCK_V(M_ok_lock);
0786         M_ok++;
0787         NDRX_LOG(log_debug, "Thread finished OK (%d)!", M_ok);
0788         MUTEX_UNLOCK_V(M_ok_lock);
0789     }
0790     else
0791     {
0792         NDRX_LOG(log_debug, "Thread failed!");
0793     }
0794 
0795     return NULL;
0796 }
0797 
0798 /**
0799  * Send some stuff to test000_server Q
0800  * TODO: Test cases
0801  * - queue not found in read mode... - fail
0802  * @param argc
0803  * @param argv
0804  * @return 
0805  */
0806 int main( int argc , char **argv )
0807 {
0808     int ret = EXSUCCEED;
0809     mqd_t mq = (mqd_t)EXFAIL;
0810     mqd_t mq_srv = (mqd_t)EXFAIL;
0811     struct mq_attr attr;
0812     char buffer[TEST_REPLY_SIZE];
0813     char buffer_rcv[TEST_REPLY_SIZE];
0814     struct   timespec tm;
0815     int i, j;
0816     char *pfx1="th1";
0817     pthread_t thread_id1;
0818     
0819     char *pfx2="th2";
0820     pthread_t thread_id2;
0821     
0822     
0823     /* we will run some detailed test and after wards the integration test... */
0824     pthread_create(&thread_id1, NULL, local_test, pfx1); 
0825     pthread_create(&thread_id2, NULL, local_test, pfx2); 
0826     
0827     /* wait for thread to complete.. */
0828     pthread_join(thread_id1, NULL); 
0829     pthread_join(thread_id2, NULL); 
0830     
0831     if (2!=M_ok)
0832     {
0833         NDRX_LOG(log_error, "unit test failed! %d", M_ok);
0834         EXFAIL_OUT(ret);
0835     }
0836     
0837     /* initialize the queue attributes */
0838     attr.mq_flags = 0;
0839     attr.mq_maxmsg = 10;
0840     attr.mq_msgsize = TEST_REPLY_SIZE;
0841     attr.mq_curmsgs = 0;
0842 
0843     /* create the message queue 
0844      * use mode flags!
0845      */
0846     if ((mqd_t)EXFAIL==(mq = ndrx_mq_open(CL_QUEUE_NAME, O_CREAT, 0644, &attr)))
0847     {
0848         NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0849                 CL_QUEUE_NAME, strerror(errno));
0850         EXFAIL_OUT(ret);
0851     }
0852     
0853     for (i=0; i<100; i++)
0854     {
0855         ssize_t bytes_read;
0856         /* receive the message 
0857          * Maybe have some timed receive
0858          */
0859         clock_gettime(CLOCK_REALTIME, &tm);
0860         tm.tv_sec += 5;  /* Set for 20 seconds */
0861         
0862         for (j=1; j<TEST_REPLY_SIZE; j++)
0863         {
0864             buffer[j] = (char)((i+j) & 0xff);
0865         }
0866 
0867         NDRX_LOG(log_debug, "About to SND!");
0868         
0869         /* open server queue */
0870         if ((mqd_t)EXFAIL==(mq_srv = ndrx_mq_open(SV_QUEUE_NAME, O_RDWR, 0644, &attr)))
0871         {
0872             NDRX_LOG(log_error, "Failed to open queue: [%s]: %s", 
0873                     SV_QUEUE_NAME, strerror(errno));
0874             EXFAIL_OUT(ret);
0875         }
0876         
0877         NDRX_DUMP(log_debug, "Sending data", buffer, TEST_REPLY_SIZE);
0878         
0879         
0880         if (EXFAIL==ndrx_mq_send(mq_srv, buffer, 
0881                 TEST_REPLY_SIZE, 0))
0882         {
0883             NDRX_LOG(log_error, "Failed to send message: %s", strerror(errno));
0884             EXFAIL_OUT(ret);
0885         }
0886         
0887         /* close server queue... */
0888         if (EXFAIL==ndrx_mq_close(mq_srv))
0889         {
0890             NDRX_LOG(log_error, "Failed to close server queue: %s", 
0891                     strerror(errno));
0892             EXFAIL_OUT(ret);
0893         }
0894         
0895         /* receive stuff back */
0896         if (EXFAIL==(bytes_read=ndrx_mq_timedreceive(mq, buffer_rcv, 
0897                 TEST_REPLY_SIZE, NULL, &tm)))
0898         {
0899             NDRX_LOG(log_error, "Failed to get message: %s", strerror(errno));
0900             EXFAIL_OUT(ret);
0901         }
0902         
0903         NDRX_LOG(log_debug, "Read bytes: %d", bytes_read);
0904         NDRX_DUMP(log_debug, "Got data", buffer_rcv, bytes_read);
0905         
0906         if (TEST_REPLY_SIZE!=bytes_read)
0907         {
0908             NDRX_LOG(log_error, "Invalid size received, expected %d but got %d",
0909                     TEST_REPLY_SIZE, bytes_read);
0910             EXFAIL_OUT(ret);
0911         }
0912         
0913         /* compare the message, byte by byte 
0914          * skip the message type...
0915          */
0916         for (j=sizeof(long); j<TEST_REPLY_SIZE; j++)
0917         {
0918             unsigned char expected = ((unsigned char)buffer[j])+1;
0919             unsigned char readb = (unsigned char)buffer_rcv[j];
0920             
0921             if (expected!=readb)
0922             {
0923                 NDRX_LOG(log_error, "Expected %x got %x at %d",
0924                         (int)expected, (int)readb, j);
0925                 EXFAIL_OUT(ret);
0926             }
0927         }
0928     }
0929 
0930 out:
0931     /* cleanup */
0932     
0933     if ((mqd_t)EXFAIL!=mq && EXFAIL==ndrx_mq_close(mq))
0934     {
0935         NDRX_LOG(log_error, "Failed to close queue: %s", strerror(errno));
0936     }
0937     
0938     if ((mqd_t)EXFAIL!=mq && EXFAIL==ndrx_mq_unlink(CL_QUEUE_NAME))
0939     {
0940         NDRX_LOG(log_error, "Failed to unlink q: %s", strerror(errno));
0941     }
0942 
0943 
0944     return ret;
0945 }
0946 
0947 
0948 /* vim: set ts=4 sw=4 et smartindent: */