mycq::CQResultQueue Class Reference

Provides methods for using a CQResult queue in a MyCQ Server. More...

#include <CQResultQueue.h>

List of all members.

Public Member Functions

virtual void getStart (OnGetHandler *handler)=0
 Set asychronous message get handler to start.
virtual bool getStop ()=0
 Set asychronous message get handler to stop.
virtual QueueSchemagetQueueSchema ()=0
 Get a queue schema of a queue.
virtual int getQueueSize ()=0
 Get the size of a queue.
virtual int getMessageCount ()=0
 Get the message count in this queue.

Detailed Description

Provides methods for using a CQResult queue in a MyCQ Server.

#include <windows.h>
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include "../MyCQClientCPP/MyCQClient.h"
#include "MyOnGetHandler.h"
#include "MyOnGetHandler2.h"

using namespace mycq;

void examSetQueueSchema3(QueueSchema* queueSchema)
{
            queueSchema->addBoolean("boolean");
            queueSchema->addByte("byte");
            queueSchema->addShort("short");
            queueSchema->addUShort("ushort");
            queueSchema->addInt("int");
            queueSchema->addLong("long");
            queueSchema->addFloat("float");
            queueSchema->addDouble("double");
            queueSchema->addDate("date");
            queueSchema->addTime("time");
            queueSchema->addDateTime("datetime");
            queueSchema->addString("string", 100);
            queueSchema->addVarString("varstring");
            queueSchema->addBinary("binary", 100);
            queueSchema->addVarBinary("varbinary");

            //columns
            //col 0: boolean
            //col 1: byte
            //col 2: short
            //col 3: ushort
            //col 4: int
            //col 5: long
            //col 6: float
            //col 7: double
            //col 8: date
            //col 9: time
            //col10: datetime
            //col11: string
            //col12: varstring
            //col13: binary
            //col14: varbinary
}

void examShowMessage(Message* msg)
{
            //get message value by column index
            std::cout << "col 0:" << msg->getBoolean(0) << std::endl;
            std::cout << "col 1:" << msg->getByte(1) << std::endl;
            std::cout << "col 2:" << msg->getShort(2) << std::endl;
            std::cout << "col 3:" << msg->getUShort(3) << std::endl;
            std::cout << "col 4:" << msg->getInt(4) << std::endl;
            std::cout << "col 5:" << msg->getLong(5) << std::endl;
            std::cout << "col 6:" << msg->getFloat(6) << std::endl;
            std::cout << "col 7:" << msg->getDouble(7) << std::endl;
            std::cout << "col 8:" << msg->getDateString(8) << std::endl;
            std::cout << "col 9:" << msg->getTimeString(9) << std::endl;
            std::cout << "col10:" << msg->getDateTimeString(10) << std::endl;
            std::cout << "col11:" << msg->getString(11) << std::endl;
            std::cout << "col12:" << msg->getVarString(12) << std::endl;
            std::cout << "col13:" << msg->getBinary(13).value << std::endl;
            std::cout << "col14:" << msg->getVarBinary(14).value << std::endl;

            //get message value by column name
            //std::cout << "col 0:" << msg->getBoolean("boolean") << std::endl;
            //std::cout << "col 1:" << msg->getByte("byte") << std::endl;
            //std::cout << "col 2:" << msg->getShort("short") << std::endl;
            //std::cout << "col 3:" << msg->getUShort("ushort") << std::endl;
            //std::cout << "col 4:" << msg->getInt("int") << std::endl;
            //std::cout << "col 5:" << msg->getLong("long") << std::endl;
            //std::cout << "col 6:" << msg->getFloat("float") << std::endl;
            //std::cout << "col 7:" << msg->getDouble("double") << std::endl;
            //std::cout << "col 8:" << msg->getDateString("date") << std::endl;
            //std::cout << "col 9:" << msg->getTimeString("time") << std::endl;
            //std::cout << "col10:" << msg->getDateTimeString("datetime") << std::endl;
            //std::cout << "col11:" << msg->getString("string") << std::endl;
            //std::cout << "col12:" << msg->getVarString("varstring") << std::endl;
            //std::cout << "col13:" << msg->getBinary("binary").value << std::endl;
            //std::cout << "col14:" << msg->getVarBinary("varbinary").value << std::endl;
}

void examP2PQueue()
{
            //get client instance
            MyCQClient* client = MyCQFactory::getMyCQClient();

            try
            {
                        //connect
                        client->connect("localhost", 3030, "root", "1234");

                        //get queue manager
                        QueueManager* queueManager = client->getQueueManager();

                        //get p2p queue
                        P2PQueue* queue = queueManager->getP2PQueue("p2pQueue");

                        //prepare message
                        Message* putMsg = queue->getBlankMessage();

                        //set message value by column index
                        putMsg->setBoolean(0, true);
                        putMsg->setByte(1, 'a');
                        putMsg->setShort(2, 100);
                        putMsg->setUShort(3, 200);
                        putMsg->setInt(4, 1000);
                        putMsg->setLong(5, 10000);
                        putMsg->setFloat(6, 100.1);
                        putMsg->setDouble(7, 10000.1);
                        putMsg->setDate(8, 2010, 1, 1);
                        putMsg->setTime(9, 12, 1, 10, 0);
                        putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
                        putMsg->setString(11, "hello? string.");
                        putMsg->setVarString(12, "hello? var string");

                        BinData binData;
                        binData.value = "binary data";
                        binData.size = 12;
                        putMsg->setBinary(13, binData);
                        putMsg->setVarBinary(14, binData);

                        //put and get
                        queue->put(putMsg);
                        Message* getMsg = queue->get();

                        if(getMsg != 0)
                        {
                                    examShowMessage(getMsg);
                        }

                        //put and get
                        queue->putTx(putMsg); //put (transaction mode)
                        getMsg = queue->getTx(); //get (transaction mode)

                        if(getMsg != 0)
                        {
                                    examShowMessage(getMsg);
                        }

                        //get start (asychronous mode)
                        MyOnGetHandler* handler = new MyOnGetHandler();
                        queue->getStart(handler);

                        //put messages
                        for(int i=0; i<100; i++)
                        {
                                    queue->put(putMsg);
                        }

                        //get stop
                        while(!queue->getStop())
                        {
                                    //sleep(1);
                        }

                        delete handler;

            }
            catch(MyCQException& ex)
            {
                        std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
            }

            //close connection
            client->close();

            //delete client instance
            MyCQFactory::close(client);
}

void examTopicQueue()
{
            //get client instance
            MyCQClient* client = MyCQFactory::getMyCQClient();

            try
            {
                        //connect
                        client->connect("localhost", 3030, "root", "1234");

                        //get queue manager
                        QueueManager* queueManager = client->getQueueManager();

                        //get p2p queue
                        TopicQueue* queue = queueManager->getTopicQueue("topicQueue");

                        //get start (asychronous mode)
                        MyOnGetHandler* handler = new MyOnGetHandler();
                        queue->getStart(handler);

                        //prepare message
                        Message* putMsg = queue->getBlankMessage();

                        //set message value by column index
                        putMsg->setBoolean(0, true);
                        putMsg->setByte(1, 'a');
                        putMsg->setShort(2, 100);
                        putMsg->setUShort(3, 200);
                        putMsg->setInt(4, 1000);
                        putMsg->setLong(5, 10000);
                        putMsg->setFloat(6, 100.1);
                        putMsg->setDouble(7, 10000.1);
                        putMsg->setDate(8, 2010, 1, 1);
                        putMsg->setTime(9, 12, 1, 10, 0);
                        putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
                        putMsg->setString(11, "hello? string.");
                        putMsg->setVarString(12, "hello? var string");

                        BinData binData;
                        binData.value = "binary data";
                        binData.size = 12;
                        putMsg->setBinary(13, binData);
                        putMsg->setVarBinary(14, binData);

                        //put messages
                        for(int i=0; i<100; i++)
                        {
                                    queue->put(putMsg);
                        }

                        //get stop
                        while(!queue->getStop())
                        {
                                    //sleep(1);
                        }

                        delete handler;

            }
            catch(MyCQException& ex)
            {
                        std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
            }

            //close connection
            client->close();

            //delete client instance
            MyCQFactory::close(client);
}

void examCQResultQueue()
{
            //get client instance
            MyCQClient* client = MyCQFactory::getMyCQClient();

            try
            {
                        //connect
                        client->connect("localhost", 3030, "root", "1234");

                        //get queue manager
                        QueueManager* queueManager = client->getQueueManager();

                        //get p2p queue
                        CQResultQueue* queue = queueManager->getCQResultQueue("cq1");

                        //see, queue schema columns
                        QueueSchema* schema = queue->getQueueSchema();
                        vector<Column> columns = schema->getColumns();

                        //get start (asychronous mode)
                        MyOnGetHandler2* handler = new MyOnGetHandler2();
                        queue->getStart(handler);


                        //put messages for cq test
                        P2PQueue* p2pQueue = queueManager->getP2PQueue("p2pQueue");

                        //prepare message
                        Message* putMsg = p2pQueue->getBlankMessage();

                        //set message value by column index
                        putMsg->setBoolean(0, true);
                        putMsg->setByte(1, 'a');
                        putMsg->setShort(2, 100);
                        putMsg->setUShort(3, 200);
                        putMsg->setInt(4, 1000);
                        putMsg->setLong(5, 10000);
                        putMsg->setFloat(6, 100.1);
                        putMsg->setDouble(7, 10000.1);
                        putMsg->setDate(8, 2010, 1, 1);
                        putMsg->setTime(9, 12, 1, 10, 0);
                        putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
                        putMsg->setString(11, "hello? string.");
                        putMsg->setVarString(12, "hello? var string");

                        BinData binData;
                        binData.value = "binary data";
                        binData.size = 12;
                        putMsg->setBinary(13, binData);
                        putMsg->setVarBinary(14, binData);

                        //put messages to p2p queue
                        for(int i=0; i<100; i++)
                        {
                                    p2pQueue->put(putMsg);
                        }

                        //wait for cq result.
                        int count = 0;
                        while(true)
                        {
                                    Sleep(1000);
                                    count++;

                                    if(count == 20)
                                                break;
                        }

                        //get stop, cq result queue
                        while(!queue->getStop())
                        {
                                    //sleep(1);
                        }

                        delete handler;

            }
            catch(MyCQException& ex)
            {
                        std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
            }

            //close connection
            client->close();

            //delete client instance
            MyCQFactory::close(client);
}

void examQueue()
{
            //get client instance
            MyCQClient* client = MyCQFactory::getMyCQClient();

            try
            {
                        //connect
                        client->connect("localhost", 3030, "root", "1234");

                        //get queue manager
                        QueueManager* queueManager = client->getQueueManager();

                        queueManager->deleteAllQueues();

                        //Queue Schema
                        //get queue schema instance
                        QueueSchema* queueSchema = MyCQFactory::getQueueSchema();

                        //set queue schema
                        examSetQueueSchema3(queueSchema);


                        //P2P Queue
                        //create p2p queue
                        queueManager->createP2PQueue("p2pQueue", queueSchema, 100);

                        //p2p queue example
                        examP2PQueue();


                        //TOPIC Queue
                        //create a topic queue
                        queueManager->createTopicQueue("topicQueue", queueSchema, 100);


                        CQManager* cqManager = client->getCQManager();

                        //CQResult Queue
                        //create cq
                        string cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] \
                                                                                     select count(*) from win1";
                        cqManager->createCQ("cq1", cq1, 100);

                        //CQResult Queue
                        //create cq
                        string cq2 = "window topicQueue as win1[size=5sec, slide=5sec] \
                                                                                     select count(*) from win1";
                        cqManager->createCQ("cq2", cq2, 100);

                        examCQResultQueue();

                        //delete all cq
                        cqManager->deleteAllCQ();

                        //delete all queues
                        queueManager->deleteAllQueues();

                        MyCQFactory::close(queueSchema);
            }
            catch(MyCQException& ex)
            {
                        std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
            }

            //close connection
            client->close();

            //delete client instance
            MyCQFactory::close(client);
}

Member Function Documentation

virtual int mycq::CQResultQueue::getMessageCount (  )  [pure virtual]

Get the message count in this queue.

Returns:
Message count integer.
Exceptions:
MyCQException An error occurred. check error code.
virtual QueueSchema* mycq::CQResultQueue::getQueueSchema (  )  [pure virtual]

Get a queue schema of a queue.

Returns:
Queue schema.
virtual int mycq::CQResultQueue::getQueueSize (  )  [pure virtual]

Get the size of a queue.

Returns:
Queue size integer.
virtual void mycq::CQResultQueue::getStart ( OnGetHandler handler  )  [pure virtual]

Set asychronous message get handler to start.

Parameters:
handler A reference of a OnGetHandler class instance. You can create a handler class by inheritance of OnGetHandler class.
Exceptions:
MyCQException An error occurred. check error code.
virtual bool mycq::CQResultQueue::getStop (  )  [pure virtual]

Set asychronous message get handler to stop.

Returns:
Bool value. true(succss), false(failed)
Exceptions:
MyCQException An error occurred. check error code.

The documentation for this class was generated from the following file:
 All Classes Namespaces Functions Variables Enumerations Enumerator
MyCQ C++ User's Document. Copyright@MyCQ Inc., All Rights Reserved.