Provides methods for using a P2P queue in a MyCQ Server. More...
#include <P2PQueue.h>
Public Member Functions | |
virtual void | put (Message *message)=0 |
Put a message.(fast and non transaction mode). | |
virtual void | putTx (Message *message)=0 |
Put a message.(transaction mode). | |
virtual Message * | get ()=0 |
Get a message.(non transaction mode). | |
virtual Message * | getTx ()=0 |
Get a message.(transaction mode). | |
virtual void | getStart (OnGetHandler *handler)=0 |
Set asynchronous message get handler to start. | |
virtual bool | getStop ()=0 |
Set asynchronous message get handler to stop. | |
virtual Message * | getBlankMessage ()=0 |
Get a blank message. | |
virtual QueueSchema * | getQueueSchema ()=0 |
Get a queue schema of a queue. | |
virtual int | getQueueSize ()=0 |
Get the size of a queue. | |
virtual int | getMessageCount ()=0 |
Get the message count in this queue. |
Provides methods for using a P2P queue in a MyCQ Server.
#include <windows.h> #include <iostream> #include <string> #include <sstream> #include <vector> #include "../MyCQClientCPP/MyCQClient.h" #include "MyOnGetHandler.h" #include "MyOnGetHandler2.h" using namespace mycq; void examSetQueueSchema3(QueueSchema* queueSchema) { queueSchema->addBoolean("boolean"); queueSchema->addByte("byte"); queueSchema->addShort("short"); queueSchema->addUShort("ushort"); queueSchema->addInt("int"); queueSchema->addLong("long"); queueSchema->addFloat("float"); queueSchema->addDouble("double"); queueSchema->addDate("date"); queueSchema->addTime("time"); queueSchema->addDateTime("datetime"); queueSchema->addString("string", 100); queueSchema->addVarString("varstring"); queueSchema->addBinary("binary", 100); queueSchema->addVarBinary("varbinary"); //columns //col 0: boolean //col 1: byte //col 2: short //col 3: ushort //col 4: int //col 5: long //col 6: float //col 7: double //col 8: date //col 9: time //col10: datetime //col11: string //col12: varstring //col13: binary //col14: varbinary } void examShowMessage(Message* msg) { //get message value by column index std::cout << "col 0:" << msg->getBoolean(0) << std::endl; std::cout << "col 1:" << msg->getByte(1) << std::endl; std::cout << "col 2:" << msg->getShort(2) << std::endl; std::cout << "col 3:" << msg->getUShort(3) << std::endl; std::cout << "col 4:" << msg->getInt(4) << std::endl; std::cout << "col 5:" << msg->getLong(5) << std::endl; std::cout << "col 6:" << msg->getFloat(6) << std::endl; std::cout << "col 7:" << msg->getDouble(7) << std::endl; std::cout << "col 8:" << msg->getDateString(8) << std::endl; std::cout << "col 9:" << msg->getTimeString(9) << std::endl; std::cout << "col10:" << msg->getDateTimeString(10) << std::endl; std::cout << "col11:" << msg->getString(11) << std::endl; std::cout << "col12:" << msg->getVarString(12) << std::endl; std::cout << "col13:" << msg->getBinary(13).value << std::endl; std::cout << "col14:" << msg->getVarBinary(14).value << std::endl; //get message value by column name //std::cout << "col 0:" << msg->getBoolean("boolean") << std::endl; //std::cout << "col 1:" << msg->getByte("byte") << std::endl; //std::cout << "col 2:" << msg->getShort("short") << std::endl; //std::cout << "col 3:" << msg->getUShort("ushort") << std::endl; //std::cout << "col 4:" << msg->getInt("int") << std::endl; //std::cout << "col 5:" << msg->getLong("long") << std::endl; //std::cout << "col 6:" << msg->getFloat("float") << std::endl; //std::cout << "col 7:" << msg->getDouble("double") << std::endl; //std::cout << "col 8:" << msg->getDateString("date") << std::endl; //std::cout << "col 9:" << msg->getTimeString("time") << std::endl; //std::cout << "col10:" << msg->getDateTimeString("datetime") << std::endl; //std::cout << "col11:" << msg->getString("string") << std::endl; //std::cout << "col12:" << msg->getVarString("varstring") << std::endl; //std::cout << "col13:" << msg->getBinary("binary").value << std::endl; //std::cout << "col14:" << msg->getVarBinary("varbinary").value << std::endl; } void examP2PQueue() { //get client instance MyCQClient* client = MyCQFactory::getMyCQClient(); try { //connect client->connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager* queueManager = client->getQueueManager(); //get p2p queue P2PQueue* queue = queueManager->getP2PQueue("p2pQueue"); //prepare message Message* putMsg = queue->getBlankMessage(); //set message value by column index putMsg->setBoolean(0, true); putMsg->setByte(1, 'a'); putMsg->setShort(2, 100); putMsg->setUShort(3, 200); putMsg->setInt(4, 1000); putMsg->setLong(5, 10000); putMsg->setFloat(6, 100.1); putMsg->setDouble(7, 10000.1); putMsg->setDate(8, 2010, 1, 1); putMsg->setTime(9, 12, 1, 10, 0); putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg->setString(11, "hello? string."); putMsg->setVarString(12, "hello? var string"); BinData binData; binData.value = "binary data"; binData.size = 12; putMsg->setBinary(13, binData); putMsg->setVarBinary(14, binData); //put and get queue->put(putMsg); Message* getMsg = queue->get(); if(getMsg != 0) { examShowMessage(getMsg); } //put and get queue->putTx(putMsg); //put (transaction mode) getMsg = queue->getTx(); //get (transaction mode) if(getMsg != 0) { examShowMessage(getMsg); } //get start (asychronous mode) MyOnGetHandler* handler = new MyOnGetHandler(); queue->getStart(handler); //put messages for(int i=0; i<100; i++) { queue->put(putMsg); } //get stop while(!queue->getStop()) { //sleep(1); } delete handler; } catch(MyCQException& ex) { std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage(); } //close connection client->close(); //delete client instance MyCQFactory::close(client); } void examTopicQueue() { //get client instance MyCQClient* client = MyCQFactory::getMyCQClient(); try { //connect client->connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager* queueManager = client->getQueueManager(); //get p2p queue TopicQueue* queue = queueManager->getTopicQueue("topicQueue"); //get start (asychronous mode) MyOnGetHandler* handler = new MyOnGetHandler(); queue->getStart(handler); //prepare message Message* putMsg = queue->getBlankMessage(); //set message value by column index putMsg->setBoolean(0, true); putMsg->setByte(1, 'a'); putMsg->setShort(2, 100); putMsg->setUShort(3, 200); putMsg->setInt(4, 1000); putMsg->setLong(5, 10000); putMsg->setFloat(6, 100.1); putMsg->setDouble(7, 10000.1); putMsg->setDate(8, 2010, 1, 1); putMsg->setTime(9, 12, 1, 10, 0); putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg->setString(11, "hello? string."); putMsg->setVarString(12, "hello? var string"); BinData binData; binData.value = "binary data"; binData.size = 12; putMsg->setBinary(13, binData); putMsg->setVarBinary(14, binData); //put messages for(int i=0; i<100; i++) { queue->put(putMsg); } //get stop while(!queue->getStop()) { //sleep(1); } delete handler; } catch(MyCQException& ex) { std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage(); } //close connection client->close(); //delete client instance MyCQFactory::close(client); } void examCQResultQueue() { //get client instance MyCQClient* client = MyCQFactory::getMyCQClient(); try { //connect client->connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager* queueManager = client->getQueueManager(); //get p2p queue CQResultQueue* queue = queueManager->getCQResultQueue("cq1"); //see, queue schema columns QueueSchema* schema = queue->getQueueSchema(); vector<Column> columns = schema->getColumns(); //get start (asychronous mode) MyOnGetHandler2* handler = new MyOnGetHandler2(); queue->getStart(handler); //put messages for cq test P2PQueue* p2pQueue = queueManager->getP2PQueue("p2pQueue"); //prepare message Message* putMsg = p2pQueue->getBlankMessage(); //set message value by column index putMsg->setBoolean(0, true); putMsg->setByte(1, 'a'); putMsg->setShort(2, 100); putMsg->setUShort(3, 200); putMsg->setInt(4, 1000); putMsg->setLong(5, 10000); putMsg->setFloat(6, 100.1); putMsg->setDouble(7, 10000.1); putMsg->setDate(8, 2010, 1, 1); putMsg->setTime(9, 12, 1, 10, 0); putMsg->setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg->setString(11, "hello? string."); putMsg->setVarString(12, "hello? var string"); BinData binData; binData.value = "binary data"; binData.size = 12; putMsg->setBinary(13, binData); putMsg->setVarBinary(14, binData); //put messages to p2p queue for(int i=0; i<100; i++) { p2pQueue->put(putMsg); } //wait for cq result. int count = 0; while(true) { Sleep(1000); count++; if(count == 20) break; } //get stop, cq result queue while(!queue->getStop()) { //sleep(1); } delete handler; } catch(MyCQException& ex) { std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage(); } //close connection client->close(); //delete client instance MyCQFactory::close(client); } void examQueue() { //get client instance MyCQClient* client = MyCQFactory::getMyCQClient(); try { //connect client->connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager* queueManager = client->getQueueManager(); queueManager->deleteAllQueues(); //Queue Schema //get queue schema instance QueueSchema* queueSchema = MyCQFactory::getQueueSchema(); //set queue schema examSetQueueSchema3(queueSchema); //P2P Queue //create p2p queue queueManager->createP2PQueue("p2pQueue", queueSchema, 100); //p2p queue example examP2PQueue(); //TOPIC Queue //create a topic queue queueManager->createTopicQueue("topicQueue", queueSchema, 100); CQManager* cqManager = client->getCQManager(); //CQResult Queue //create cq string cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] \ select count(*) from win1"; cqManager->createCQ("cq1", cq1, 100); //CQResult Queue //create cq string cq2 = "window topicQueue as win1[size=5sec, slide=5sec] \ select count(*) from win1"; cqManager->createCQ("cq2", cq2, 100); examCQResultQueue(); //delete all cq cqManager->deleteAllCQ(); //delete all queues queueManager->deleteAllQueues(); MyCQFactory::close(queueSchema); } catch(MyCQException& ex) { std::cout << "code:" << ex.getCode() << ", message:" << ex.getMessage(); } //close connection client->close(); //delete client instance MyCQFactory::close(client); }
virtual Message* mycq::P2PQueue::get | ( | ) | [pure virtual] |
Get a message.(non transaction mode).
MyCQException | An error occurred. check error code. |
virtual Message* mycq::P2PQueue::getBlankMessage | ( | ) | [pure virtual] |
Get a blank message.
virtual int mycq::P2PQueue::getMessageCount | ( | ) | [pure virtual] |
Get the message count in this queue.
MyCQException | An error occurred. check error code. |
virtual QueueSchema* mycq::P2PQueue::getQueueSchema | ( | ) | [pure virtual] |
Get a queue schema of a queue.
virtual int mycq::P2PQueue::getQueueSize | ( | ) | [pure virtual] |
Get the size of a queue.
virtual void mycq::P2PQueue::getStart | ( | OnGetHandler * | handler | ) | [pure virtual] |
Set asynchronous message get handler to start.
This sample shows how to use COnGetHandler. using System; using System.Text; using MyCQ; //MyCQ Client Library
public class SampleOnGetHandler : COnGetHandler { public override void onGet(CMessage message) { int val1 = message.getInt(0); string val2 = message.getString(1); } }
class Program { public void p2pQueueSample() { CMyCQClient client = CMyCQFactory.getMyCQClient();
CQueueManager queueManager = null; CP2PQueue queue = null;
try { client.connect("localhost", 3030, "root", "1234");
queueManager = client.getQueueManager(); queue = queueManager.getP2PQueue("queueName");
SampleOnGetHandler handler = new SampleOnGetHandler();
queue.getStart(handler);
while(true) { if(queue.getStop()) break; sleep(n); }
queueManager.close(queue); } catch (MyCQException ex) { Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage()); }
client.close(); } }
handler | A reference of a OnGetHandler class instance. You can create a handler class by inheritance of OnGetHandler class. |
MyCQException | An error occurred. check error code. |
virtual bool mycq::P2PQueue::getStop | ( | ) | [pure virtual] |
Set asynchronous message get handler to stop.
MyCQException | An error occurred. check error code. |
virtual Message* mycq::P2PQueue::getTx | ( | ) | [pure virtual] |
Get a message.(transaction mode).
MyCQException | An error occurred. check error code. |
virtual void mycq::P2PQueue::put | ( | Message * | message | ) | [pure virtual] |
Put a message.(fast and non transaction mode).
message | A reference of a message instance. P2PQueue's getBlankMessage() gives a reference of a blank message. |
MyCQException | An error occurred. check error code. |
virtual void mycq::P2PQueue::putTx | ( | Message * | message | ) | [pure virtual] |
Put a message.(transaction mode).
message | A reference of a message instance. P2PQueue's getBlankMessage() gives a reference of a blank message. |
MyCQException | An error occurred. check error code. |