Asynchronous message get handler class. You can implement your own asynchronous message get handler by inheritance of this class. More...
Public Member Functions | |
abstract void | onGet (Message message) |
Get asynchronously, when there is a message in a queue. |
Asynchronous message get handler class. You can implement your own asynchronous message get handler by inheritance of this class.
using System; using System.Collections.Generic; using System.Threading; using MyCQ; namespace MyCQClientNETExample { class MyOnGetHandler : OnGetHandler { public override void onGet(Message msg) { try { //get message value by column index Console.WriteLine("col 0:" + msg.getBoolean(0)); Console.WriteLine("col 1:" + msg.getByte(1)); Console.WriteLine("col 2:" + msg.getShort(2)); Console.WriteLine("col 3:" + msg.getUShort(3)); Console.WriteLine("col 4:" + msg.getInt(4)); Console.WriteLine("col 5:" + msg.getLong(5)); Console.WriteLine("col 6:" + msg.getFloat(6)); Console.WriteLine("col 7:" + msg.getDouble(7)); Console.WriteLine("col 8:" + msg.getDateString(8)); Console.WriteLine("col 9:" + msg.getTimeString(9)); Console.WriteLine("col10:" + msg.getDateTimeString(10)); Console.WriteLine("col11:" + msg.getString(11)); Console.WriteLine("col12:" + msg.getVarString(12)); Console.WriteLine("col13:" + msg.getBinary(13)); Console.WriteLine("col14:" + msg.getVarBinary(14)); //get message value by column name //Console.WriteLine("col 0:" + msg.getBoolean("boolean")); //Console.WriteLine("col 1:" + msg.getByte("byte")); //Console.WriteLine("col 2:" + msg.getShort("short")); //Console.WriteLine("col 3:" + msg.getUShort("ushort")); //Console.WriteLine("col 4:" + msg.getInt("int")); //Console.WriteLine("col 5:" + msg.getLong("long")); //Console.WriteLine("col 6:" + msg.getFloat("float")); //Console.WriteLine("col 7:" + msg.getDouble("double")); //Console.WriteLine("col 8:" + msg.getDateString("date")); //Console.WriteLine("col 9:" + msg.getTimeString("time")); //Console.WriteLine("col10:" + msg.getDateTimeString("datetime")); //Console.WriteLine("col11:" + msg.getString("string")); //Console.WriteLine("col12:" + msg.getVarString("varstring")); //Console.WriteLine("col13:" + msg.getBinary("binary").value ); //Console.WriteLine("col14:" + msg.getVarBinary("varbinary").value ); } catch (MyCQException ex) { Console.WriteLine(ex.getCode() + ":" + ex.getMessage()); } } } class MyOnGetHandler2 : OnGetHandler { public override void onGet(Message msg) { try { //get message value by column index Console.WriteLine("col 0:" + msg.getLong(0)); //get message value by column name //Console.WriteLine("col 0:" + msg.getLong("long")); } catch (MyCQException ex) { Console.WriteLine(ex.getCode() + ":" + ex.getMessage()); } } } class ExamQueue { void examSetQueueSchema3(QueueSchema queueSchema) { queueSchema.addBoolean("boolean"); queueSchema.addByte("byte"); queueSchema.addShort("short"); queueSchema.addUShort("ushort"); queueSchema.addInt("int"); queueSchema.addLong("long"); queueSchema.addFloat("float"); queueSchema.addDouble("double"); queueSchema.addDate("date"); queueSchema.addTime("time"); queueSchema.addDateTime("datetime"); queueSchema.addString("string", 100); queueSchema.addVarString("varstring"); queueSchema.addBinary("binary", 100); queueSchema.addVarBinary("varbinary"); //columns //col 0: boolean //col 1: byte //col 2: short //col 3: ushort //col 4: int //col 5: long //col 6: float //col 7: double //col 8: date //col 9: time //col10: datetime //col11: string //col12: varstring //col13: binary //col14: varbinary } void examShowMessage(Message msg) { //get message value by column index Console.WriteLine("col 0:" + msg.getBoolean(0)); Console.WriteLine("col 1:" + msg.getByte(1)); Console.WriteLine("col 2:" + msg.getShort(2)); Console.WriteLine("col 3:" + msg.getUShort(3)); Console.WriteLine("col 4:" + msg.getInt(4)); Console.WriteLine("col 5:" + msg.getLong(5)); Console.WriteLine("col 6:" + msg.getFloat(6)); Console.WriteLine("col 7:" + msg.getDouble(7)); Console.WriteLine("col 8:" + msg.getDateString(8)); Console.WriteLine("col 9:" + msg.getTimeString(9)); Console.WriteLine("col10:" + msg.getDateTimeString(10)); Console.WriteLine("col11:" + msg.getString(11)); Console.WriteLine("col12:" + msg.getVarString(12)); Console.WriteLine("col13:" + msg.getBinary(13)); Console.WriteLine("col14:" + msg.getVarBinary(14)); //get message value by column name //Console.WriteLine("col 0:" + msg.getBoolean("boolean")); //Console.WriteLine("col 1:" + msg.getByte("byte")); //Console.WriteLine("col 2:" + msg.getShort("short")); //Console.WriteLine("col 3:" + msg.getUShort("ushort")); //Console.WriteLine("col 4:" + msg.getInt("int")); //Console.WriteLine("col 5:" + msg.getLong("long")); //Console.WriteLine("col 6:" + msg.getFloat("float")); //Console.WriteLine("col 7:" + msg.getDouble("double")); //Console.WriteLine("col 8:" + msg.getDateString("date")); //Console.WriteLine("col 9:" + msg.getTimeString("time")); //Console.WriteLine("col10:" + msg.getDateTimeString("datetime")); //Console.WriteLine("col11:" + msg.getString("string")); //Console.WriteLine("col12:" + msg.getVarString("varstring")); //Console.WriteLine("col13:" + msg.getBinary("binary").value ); //Console.WriteLine("col14:" + msg.getVarBinary("varbinary").value ); } void examP2PQueue() { //get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { //connect client.connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager queueManager = client.getQueueManager(); //get p2p queue P2PQueue queue = queueManager.getP2PQueue("p2pQueue"); //prepare message Message putMsg = queue.getBlankMessage(); //set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, 0x41); putMsg.setShort(2, 100); putMsg.setUShort(3, 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); int temp = 1000; putMsg.setBinary(13, BitConverter.GetBytes(temp)); putMsg.setVarBinary(14, BitConverter.GetBytes(temp)); //put and get queue.put(putMsg); Message getMsg = queue.get(); if (getMsg != null) { examShowMessage(getMsg); } //put and get queue.putTx(putMsg); //put (transaction mode) getMsg = queue.getTx(); //get (transaction mode) if (getMsg != null) { examShowMessage(getMsg); } //get start (asychronous mode) MyOnGetHandler handler = new MyOnGetHandler(); queue.getStart(handler); //put messages for (int i = 0; i < 100; i++) { queue.put(putMsg); } //get stop while (!queue.getStop()) { Thread.Sleep(10); } } catch (MyCQException ex) { Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage()); } //close connection client.close(); } void examTopicQueue() { //get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { //connect client.connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager queueManager = client.getQueueManager(); //get p2p queue TopicQueue queue = queueManager.getTopicQueue("topicQueue"); //get start (asychronous mode) MyOnGetHandler handler = new MyOnGetHandler(); queue.getStart(handler); //prepare message Message putMsg = queue.getBlankMessage(); //set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, 0x41); putMsg.setShort(2, 100); putMsg.setUShort(3, 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); int temp = 1000; putMsg.setBinary(13, BitConverter.GetBytes(temp)); putMsg.setVarBinary(14, BitConverter.GetBytes(temp)); //put messages for (int i = 0; i < 100; i++) { queue.put(putMsg); } //get stop while (!queue.getStop()) { Thread.Sleep(10); } } catch (MyCQException ex) { Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage()); } //close connection client.close(); } void examCQResultQueue() { //get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { //connect client.connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager queueManager = client.getQueueManager(); //get p2p queue CQResultQueue queue = queueManager.getCQResultQueue("cq1"); //see, queue schema columns QueueSchema schema = queue.getQueueSchema(); List<Column> columns = schema.getColumns(); //get start (asychronous mode) MyOnGetHandler2 handler = new MyOnGetHandler2(); queue.getStart(handler); //put messages for cq test P2PQueue p2pQueue = queueManager.getP2PQueue("p2pQueue"); //prepare message Message putMsg = p2pQueue.getBlankMessage(); //set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, 0x41); putMsg.setShort(2, 100); putMsg.setUShort(3, 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); int temp = 1000; putMsg.setBinary(13, BitConverter.GetBytes(temp)); putMsg.setVarBinary(14, BitConverter.GetBytes(temp)); //put messages to p2p queue for (int i = 0; i < 100; i++) { p2pQueue.put(putMsg); } //wait for cq result. int count = 0; while (true) { Thread.Sleep(1000); count++; if (count == 20) break; } //get stop, cq result queue while (!queue.getStop()) { Thread.Sleep(10); } } catch (MyCQException ex) { Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage()); } //close connection client.close(); } public void sample() { //get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { //connect client.connect("localhost", 3030, "root", "1234"); //get queue manager QueueManager queueManager = client.getQueueManager(); queueManager.deleteAllQueues(); //Queue Schema //get queue schema instance QueueSchema queueSchema = MyCQFactory.getQueueSchema(); //set queue schema examSetQueueSchema3(queueSchema); //P2P Queue //create p2p queue queueManager.createP2PQueue("p2pQueue", queueSchema, 100); //p2p queue example examP2PQueue(); //TOPIC Queue //create a topic queue queueManager.createTopicQueue("topicQueue", queueSchema, 100); CQManager cqManager = client.getCQManager(); //CQResult Queue //create cq string cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] select count(*) from win1"; cqManager.createCQ("cq1", cq1, 100); //CQResult Queue //create cq string cq2 = "window topicQueue as win1[size=5sec, slide=5sec] select count(*) from win1"; cqManager.createCQ("cq2", cq2, 100); examCQResultQueue(); //delete all cq cqManager.deleteAllCQ(); //delete all queues queueManager.deleteAllQueues(); } catch (MyCQException ex) { Console.WriteLine("code:" + ex.getCode() + ", message:" + ex.getMessage()); } //close connection client.close(); } } }
abstract void MyCQ.OnGetHandler.onGet | ( | Message | message | ) | [pure virtual] |
Get asynchronously, when there is a message in a queue.
message | A reference of a message instance. |