using System;
using System.Collections.Generic;
using System.Threading;
using MyCQ;
namespace MyCQClientNETExample
{
class MyOnGetHandler : OnGetHandler
{
public override void onGet(Message msg)
{
try
{
Console.WriteLine("col 0:" + msg.getBoolean(0));
Console.WriteLine("col 1:" + msg.getByte(1));
Console.WriteLine("col 2:" + msg.getShort(2));
Console.WriteLine("col 3:" + msg.getUShort(3));
Console.WriteLine("col 4:" + msg.getInt(4));
Console.WriteLine("col 5:" + msg.getLong(5));
Console.WriteLine("col 6:" + msg.getFloat(6));
Console.WriteLine("col 7:" + msg.getDouble(7));
Console.WriteLine("col 8:" + msg.getDateString(8));
Console.WriteLine("col 9:" + msg.getTimeString(9));
Console.WriteLine("col10:" + msg.getDateTimeString(10));
Console.WriteLine("col11:" + msg.getString(11));
Console.WriteLine("col12:" + msg.getVarString(12));
Console.WriteLine("col13:" + msg.getBinary(13));
Console.WriteLine("col14:" + msg.getVarBinary(14));
}
catch (MyCQException ex)
{
Console.WriteLine(ex.getCode() + ":" + ex.getMessage());
}
}
}
class MyOnGetHandler2 : OnGetHandler
{
public override void onGet(Message msg)
{
try
{
Console.WriteLine("col 0:" + msg.getLong(0));
}
catch (MyCQException ex)
{
Console.WriteLine(ex.getCode() + ":" + ex.getMessage());
}
}
}
class ExamQueue
{
void examSetQueueSchema3(QueueSchema queueSchema)
{
queueSchema.addBoolean("boolean");
queueSchema.addByte("byte");
queueSchema.addShort("short");
queueSchema.addUShort("ushort");
queueSchema.addInt("int");
queueSchema.addLong("long");
queueSchema.addFloat("float");
queueSchema.addDouble("double");
queueSchema.addDate("date");
queueSchema.addTime("time");
queueSchema.addDateTime("datetime");
queueSchema.addString("string", 100);
queueSchema.addVarString("varstring");
queueSchema.addBinary("binary", 100);
queueSchema.addVarBinary("varbinary");
}
void examShowMessage(Message msg)
{
Console.WriteLine("col 0:" + msg.getBoolean(0));
Console.WriteLine("col 1:" + msg.getByte(1));
Console.WriteLine("col 2:" + msg.getShort(2));
Console.WriteLine("col 3:" + msg.getUShort(3));
Console.WriteLine("col 4:" + msg.getInt(4));
Console.WriteLine("col 5:" + msg.getLong(5));
Console.WriteLine("col 6:" + msg.getFloat(6));
Console.WriteLine("col 7:" + msg.getDouble(7));
Console.WriteLine("col 8:" + msg.getDateString(8));
Console.WriteLine("col 9:" + msg.getTimeString(9));
Console.WriteLine("col10:" + msg.getDateTimeString(10));
Console.WriteLine("col11:" + msg.getString(11));
Console.WriteLine("col12:" + msg.getVarString(12));
Console.WriteLine("col13:" + msg.getBinary(13));
Console.WriteLine("col14:" + msg.getVarBinary(14));
}
void examP2PQueue()
{
MyCQClient client = MyCQFactory.getMyCQClient();
try
{
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
P2PQueue queue = queueManager.getP2PQueue("p2pQueue");
Message putMsg = queue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, 0x41);
putMsg.setShort(2, 100);
putMsg.setUShort(3, 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
int temp = 1000;
putMsg.setBinary(13, BitConverter.GetBytes(temp));
putMsg.setVarBinary(14, BitConverter.GetBytes(temp));
queue.put(putMsg);
Message getMsg = queue.get();
if (getMsg != null)
{
examShowMessage(getMsg);
}
queue.putTx(putMsg);
getMsg = queue.getTx();
if (getMsg != null)
{
examShowMessage(getMsg);
}
MyOnGetHandler handler = new MyOnGetHandler();
queue.getStart(handler);
for (int i = 0; i < 100; i++)
{
queue.put(putMsg);
}
while (!queue.getStop())
{
Thread.Sleep(10);
}
}
catch (MyCQException ex)
{
Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage());
}
client.close();
}
void examTopicQueue()
{
MyCQClient client = MyCQFactory.getMyCQClient();
try
{
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
TopicQueue queue = queueManager.getTopicQueue("topicQueue");
MyOnGetHandler handler = new MyOnGetHandler();
queue.getStart(handler);
Message putMsg = queue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, 0x41);
putMsg.setShort(2, 100);
putMsg.setUShort(3, 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
int temp = 1000;
putMsg.setBinary(13, BitConverter.GetBytes(temp));
putMsg.setVarBinary(14, BitConverter.GetBytes(temp));
for (int i = 0; i < 100; i++)
{
queue.put(putMsg);
}
while (!queue.getStop())
{
Thread.Sleep(10);
}
}
catch (MyCQException ex)
{
Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage());
}
client.close();
}
void examCQResultQueue()
{
MyCQClient client = MyCQFactory.getMyCQClient();
try
{
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
CQResultQueue queue = queueManager.getCQResultQueue("cq1");
QueueSchema schema = queue.getQueueSchema();
List<Column> columns = schema.getColumns();
MyOnGetHandler2 handler = new MyOnGetHandler2();
queue.getStart(handler);
P2PQueue p2pQueue = queueManager.getP2PQueue("p2pQueue");
Message putMsg = p2pQueue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, 0x41);
putMsg.setShort(2, 100);
putMsg.setUShort(3, 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
int temp = 1000;
putMsg.setBinary(13, BitConverter.GetBytes(temp));
putMsg.setVarBinary(14, BitConverter.GetBytes(temp));
for (int i = 0; i < 100; i++)
{
p2pQueue.put(putMsg);
}
int count = 0;
while (true)
{
Thread.Sleep(1000);
count++;
if (count == 20)
break;
}
while (!queue.getStop())
{
Thread.Sleep(10);
}
}
catch (MyCQException ex)
{
Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage());
}
client.close();
}
public void sample()
{
MyCQClient client = MyCQFactory.getMyCQClient();
try
{
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
queueManager.deleteAllQueues();
QueueSchema queueSchema = MyCQFactory.getQueueSchema();
examSetQueueSchema3(queueSchema);
queueManager.createP2PQueue("p2pQueue", queueSchema, 100);
examP2PQueue();
queueManager.createTopicQueue("topicQueue", queueSchema, 100);
CQManager cqManager = client.getCQManager();
string cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] select count(*) from win1";
cqManager.createCQ("cq1", cq1, 100);
string cq2 = "window topicQueue as win1[size=5sec, slide=5sec] select count(*) from win1";
cqManager.createCQ("cq2", cq2, 100);
examCQResultQueue();
cqManager.deleteAllCQ();
queueManager.deleteAllQueues();
}
catch (MyCQException ex)
{
Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage());
}
client.close();
}
}
}