Provides methods for using a CQResult queue in a MyCQ Server.
#include <windows.h>
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include "../MyCQClientCPP/MyCQClient.h"
#include "MyOnGetHandler.h"
#include "MyOnGetHandler2.h"
using namespace mycq;
void examSetQueueSchema3(QueueSchema* queueSchema)
{
queueSchema->addBoolean("boolean");
queueSchema->addByte("byte");
queueSchema->addShort("short");
queueSchema->addUShort("ushort");
queueSchema->addInt("int");
queueSchema->addLong("long");
queueSchema->addFloat("float");
queueSchema->addDouble("double");
queueSchema->addDate("date");
queueSchema->addTime("time");
queueSchema->addDateTime("datetime");
queueSchema->addString("string", 100);
queueSchema->addVarString("varstring");
queueSchema->addBinary("binary", 100);
queueSchema->addVarBinary("varbinary");
}
void examShowMessage(Message* msg)
{
std::cout << "col 0:" << msg->getBoolean(0) << std::endl;
std::cout << "col 1:" << msg->getByte(1) << std::endl;
std::cout << "col 2:" << msg->getShort(2) << std::endl;
std::cout << "col 3:" << msg->getUShort(3) << std::endl;
std::cout << "col 4:" << msg->getInt(4) << std::endl;
std::cout << "col 5:" << msg->getLong(5) << std::endl;
std::cout << "col 6:" << msg->getFloat(6) << std::endl;
std::cout << "col 7:" << msg->getDouble(7) << std::endl;
std::cout << "col 8:" << msg->getDateString(8) << std::endl;
std::cout << "col 9:" << msg->getTimeString(9) << std::endl;
std::cout << "col10:" << msg->getDateTimeString(10) << std::endl;
std::cout << "col11:" << msg->getString(11) << std::endl;
std::cout << "col12:" << msg->getVarString(12) << std::endl;
std::cout << "col13:" << msg->getBinary(13).value << std::endl;
std::cout << "col14:" << msg->getVarBinary(14).value << std::endl;
}
void examP2PQueue()
{
MyCQClient* client = MyCQFactory::getMyCQClient();
try
{
client->connect("localhost", 3030, "root", "1234");
QueueManager* queueManager = client->getQueueManager();
P2PQueue* queue = queueManager->getP2PQueue("p2pQueue");
Message* putMsg = queue->getBlankMessage();
putMsg->setBoolean(0, true);
putMsg->setByte(1, 'a');
putMsg->setShort(2, 100);
putMsg->setUShort(3, 200);
putMsg->setInt(4, 1000);
putMsg->setLong(5, 10000);
putMsg->setFloat(6, 100.1);
putMsg->setDouble(7, 10000.1);
putMsg->setDate(8, 2010, 1, 1);
putMsg->setTime(9, 12, 1, 10, 0);
putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg->setString(11, "hello? string.");
putMsg->setVarString(12, "hello? var string");
BinData binData;
binData.value = "binary data";
binData.size = 12;
putMsg->setBinary(13, binData);
putMsg->setVarBinary(14, binData);
queue->put(putMsg);
Message* getMsg = queue->get();
if(getMsg != 0)
{
examShowMessage(getMsg);
}
queue->putTx(putMsg);
getMsg = queue->getTx();
if(getMsg != 0)
{
examShowMessage(getMsg);
}
MyOnGetHandler* handler = new MyOnGetHandler();
queue->getStart(handler);
for(int i=0; i<100; i++)
{
queue->put(putMsg);
}
while(!queue->getStop())
{
}
delete handler;
}
catch(MyCQException& ex)
{
std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
}
client->close();
MyCQFactory::close(client);
}
void examTopicQueue()
{
MyCQClient* client = MyCQFactory::getMyCQClient();
try
{
client->connect("localhost", 3030, "root", "1234");
QueueManager* queueManager = client->getQueueManager();
TopicQueue* queue = queueManager->getTopicQueue("topicQueue");
MyOnGetHandler* handler = new MyOnGetHandler();
queue->getStart(handler);
Message* putMsg = queue->getBlankMessage();
putMsg->setBoolean(0, true);
putMsg->setByte(1, 'a');
putMsg->setShort(2, 100);
putMsg->setUShort(3, 200);
putMsg->setInt(4, 1000);
putMsg->setLong(5, 10000);
putMsg->setFloat(6, 100.1);
putMsg->setDouble(7, 10000.1);
putMsg->setDate(8, 2010, 1, 1);
putMsg->setTime(9, 12, 1, 10, 0);
putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg->setString(11, "hello? string.");
putMsg->setVarString(12, "hello? var string");
BinData binData;
binData.value = "binary data";
binData.size = 12;
putMsg->setBinary(13, binData);
putMsg->setVarBinary(14, binData);
for(int i=0; i<100; i++)
{
queue->put(putMsg);
}
while(!queue->getStop())
{
}
delete handler;
}
catch(MyCQException& ex)
{
std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
}
client->close();
MyCQFactory::close(client);
}
void examCQResultQueue()
{
MyCQClient* client = MyCQFactory::getMyCQClient();
try
{
client->connect("localhost", 3030, "root", "1234");
QueueManager* queueManager = client->getQueueManager();
CQResultQueue* queue = queueManager->getCQResultQueue("cq1");
QueueSchema* schema = queue->getQueueSchema();
vector<Column> columns = schema->getColumns();
MyOnGetHandler2* handler = new MyOnGetHandler2();
queue->getStart(handler);
P2PQueue* p2pQueue = queueManager->getP2PQueue("p2pQueue");
Message* putMsg = p2pQueue->getBlankMessage();
putMsg->setBoolean(0, true);
putMsg->setByte(1, 'a');
putMsg->setShort(2, 100);
putMsg->setUShort(3, 200);
putMsg->setInt(4, 1000);
putMsg->setLong(5, 10000);
putMsg->setFloat(6, 100.1);
putMsg->setDouble(7, 10000.1);
putMsg->setDate(8, 2010, 1, 1);
putMsg->setTime(9, 12, 1, 10, 0);
putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg->setString(11, "hello? string.");
putMsg->setVarString(12, "hello? var string");
BinData binData;
binData.value = "binary data";
binData.size = 12;
putMsg->setBinary(13, binData);
putMsg->setVarBinary(14, binData);
for(int i=0; i<100; i++)
{
p2pQueue->put(putMsg);
}
int count = 0;
while(true)
{
Sleep(1000);
count++;
if(count == 20)
break;
}
while(!queue->getStop())
{
}
delete handler;
}
catch(MyCQException& ex)
{
std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
}
client->close();
MyCQFactory::close(client);
}
void examQueue()
{
MyCQClient* client = MyCQFactory::getMyCQClient();
try
{
client->connect("localhost", 3030, "root", "1234");
QueueManager* queueManager = client->getQueueManager();
queueManager->deleteAllQueues();
QueueSchema* queueSchema = MyCQFactory::getQueueSchema();
examSetQueueSchema3(queueSchema);
queueManager->createP2PQueue("p2pQueue", queueSchema, 100);
examP2PQueue();
queueManager->createTopicQueue("topicQueue", queueSchema, 100);
CQManager* cqManager = client->getCQManager();
string cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] \
select count(*) from win1";
cqManager->createCQ("cq1", cq1, 100);
string cq2 = "window topicQueue as win1[size=5sec, slide=5sec] \
select count(*) from win1";
cqManager->createCQ("cq2", cq2, 100);
examCQResultQueue();
cqManager->deleteAllCQ();
queueManager->deleteAllQueues();
MyCQFactory::close(queueSchema);
}
catch(MyCQException& ex)
{
std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage();
}
client->close();
MyCQFactory::close(client);
}