Asynchronous message get handler class. You can implement your own asynchronous message get handler by inheritance of this class. More...
Public Member Functions | |
abstract void | onGet (Message message) |
Get asynchronously, when there is a message in a queue. |
Asynchronous message get handler class. You can implement your own asynchronous message get handler by inheritance of this class.
import java.util.List; import mycq.*; class MyOnGetHandler extends OnGetHandler { public void onGet(Message msg) { try { // get message value by column index System.out.println("col 0:" + msg.getBoolean(0)); System.out.println("col 1:" + msg.getByte(1)); System.out.println("col 2:" + msg.getShort(2)); System.out.println("col 3:" + msg.getUShort(3)); System.out.println("col 4:" + msg.getInt(4)); System.out.println("col 5:" + msg.getLong(5)); System.out.println("col 6:" + msg.getFloat(6)); System.out.println("col 7:" + msg.getDouble(7)); System.out.println("col 8:" + msg.getDateString(8)); System.out.println("col 9:" + msg.getTimeString(9)); System.out.println("col10:" + msg.getDateTimeString(10)); System.out.println("col11:" + msg.getString(11)); System.out.println("col12:" + msg.getVarString(12)); System.out.println("col13:" + msg.getBinary(13)); System.out.println("col14:" + msg.getVarBinary(14)); // get message value by column name // System.out.println("col 0:" + msg.getBoolean("boolean")); // System.out.println("col 1:" + msg.getByte("byte")); // System.out.println("col 2:" + msg.getShort("short")); // System.out.println("col 3:" + msg.getUShort("ushort")); // System.out.println("col 4:" + msg.getInt("int")); // System.out.println("col 5:" + msg.getLong("long")); // System.out.println("col 6:" + msg.getFloat("float")); // System.out.println("col 7:" + msg.getDouble("double")); // System.out.println("col 8:" + msg.getDateString("date")); // System.out.println("col 9:" + msg.getTimeString("time")); // System.out.println("col10:" + msg.getDateTimeString("datetime")); // System.out.println("col11:" + msg.getString("string")); // System.out.println("col12:" + msg.getVarString("varstring")); // System.out.println("col13:" + msg.getBinary("binary").value ); // System.out.println("col14:" + msg.getVarBinary("varbinary").value // ); } catch (MyCQException ex) { System.out.println(ex.getCode() + ":" + ex.getMessage()); } } } class MyOnGetHandler2 extends OnGetHandler { public void onGet(Message msg) { try { // get message value by column index System.out.println("col 0:" + msg.getLong(0)); // get message value by column name // System.out.println("col 0:" + msg.getLong("long")); } catch (MyCQException ex) { System.out.println(ex.getCode() + ":" + ex.getMessage()); } } } public class ExamQueue { void examSetQueueSchema(QueueSchema queueSchema) { try { queueSchema.addBoolean("boolean"); queueSchema.addByte("byte"); queueSchema.addShort("short"); queueSchema.addUShort("ushort"); queueSchema.addInt("int"); queueSchema.addLong("long"); queueSchema.addFloat("float"); queueSchema.addDouble("double"); queueSchema.addDate("date"); queueSchema.addTime("time"); queueSchema.addDateTime("datetime"); queueSchema.addString("string", 100); queueSchema.addVarString("varstring"); queueSchema.addBinary("binary", 100); queueSchema.addVarBinary("varbinary"); // columns // col 0: boolean // col 1: byte // col 2: short // col 3: ushort // col 4: int // col 5: long // col 6: float // col 7: double // col 8: date // col 9: time // col10: datetime // col11: string // col12: varstring // col13: binary // col14: varbinary } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } } void examShowMessage(Message msg) { // get message value by column index try { System.out.println("col 0:" + msg.getBoolean(0)); System.out.println("col 1:" + msg.getByte(1)); System.out.println("col 2:" + msg.getShort(2)); System.out.println("col 3:" + msg.getUShort(3)); System.out.println("col 4:" + msg.getInt(4)); System.out.println("col 5:" + msg.getLong(5)); System.out.println("col 6:" + msg.getFloat(6)); System.out.println("col 7:" + msg.getDouble(7)); System.out.println("col 8:" + msg.getDateString(8)); System.out.println("col 9:" + msg.getTimeString(9)); System.out.println("col10:" + msg.getDateTimeString(10)); System.out.println("col11:" + msg.getString(11)); System.out.println("col12:" + msg.getVarString(12)); System.out.println("col13:" + msg.getBinary(13)); System.out.println("col14:" + msg.getVarBinary(14)); // get message value by column name // System.out.println("col 0:" + msg.getBoolean("boolean")); // System.out.println("col 1:" + msg.getByte("byte")); // System.out.println("col 2:" + msg.getShort("short")); // System.out.println("col 3:" + msg.getUShort("ushort")); // System.out.println("col 4:" + msg.getInt("int")); // System.out.println("col 5:" + msg.getLong("long")); // System.out.println("col 6:" + msg.getFloat("float")); // System.out.println("col 7:" + msg.getDouble("double")); // System.out.println("col 8:" + msg.getDateString("date")); // System.out.println("col 9:" + msg.getTimeString("time")); // System.out.println("col10:" + msg.getDateTimeString("datetime")); // System.out.println("col11:" + msg.getString("string")); // System.out.println("col12:" + msg.getVarString("varstring")); // System.out.println("col13:" + msg.getBinary("binary").value ); // System.out.println("col14:" + msg.getVarBinary("varbinary").value // ); } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } } void examP2PQueue() { // get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { // connect client.connect("localhost", 3030, "root", "1234"); // get queue manager QueueManager queueManager = client.getQueueManager(); // get p2p queue P2PQueue queue = queueManager.getP2PQueue("p2pQueue"); // prepare message Message putMsg = queue.getBlankMessage(); // set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, (byte) 0x41); putMsg.setShort(2, (short) 100); putMsg.setUShort(3, (char) 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); String temp = "hello?"; putMsg.setBinary(13, temp.getBytes()); putMsg.setVarBinary(14, temp.getBytes()); // put and get queue.put(putMsg); Message getMsg = queue.get(); if (getMsg != null) { examShowMessage(getMsg); } // put and get queue.putTx(putMsg); // put (transaction mode) getMsg = queue.getTx(); // get (transaction mode) if (getMsg != null) { examShowMessage(getMsg); } // get start (asychronous mode) MyOnGetHandler handler = new MyOnGetHandler(); queue.getStart(handler); // put messages for (int i = 0; i < 100; i++) { queue.put(putMsg); } // get stop while (!queue.getStop()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } // close connection client.close(); } void examTopicQueue() { // get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { // connect client.connect("localhost", 3030, "root", "1234"); // get queue manager QueueManager queueManager = client.getQueueManager(); // get p2p queue TopicQueue queue = queueManager.getTopicQueue("topicQueue"); // get start (asychronous mode) MyOnGetHandler handler = new MyOnGetHandler(); queue.getStart(handler); // prepare message Message putMsg = queue.getBlankMessage(); // set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, (byte) 0x41); putMsg.setShort(2, (short) 100); putMsg.setUShort(3, (char) 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); String temp = "hello"; putMsg.setBinary(13, temp.getBytes()); putMsg.setVarBinary(14, temp.getBytes()); // put messages for (int i = 0; i < 100; i++) { queue.put(putMsg); } // get stop while (!queue.getStop()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } // close connection client.close(); } void examCQResultQueue() { // get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { // connect client.connect("localhost", 3030, "root", "1234"); // get queue manager QueueManager queueManager = client.getQueueManager(); // get p2p queue CQResultQueue queue = queueManager.getCQResultQueue("cq1"); // see, queue schema columns QueueSchema schema = queue.getQueueSchema(); List<Column> columns = schema.getColumns(); // get start (asychronous mode) MyOnGetHandler2 handler = new MyOnGetHandler2(); queue.getStart(handler); // put messages for cq test P2PQueue p2pQueue = queueManager.getP2PQueue("p2pQueue"); // prepare message Message putMsg = p2pQueue.getBlankMessage(); // set message value by column index putMsg.setBoolean(0, true); putMsg.setByte(1, (byte) 0x41); putMsg.setShort(2, (short) 100); putMsg.setUShort(3, (char) 200); putMsg.setInt(4, 1000); putMsg.setLong(5, 10000); putMsg.setFloat(6, 100.1f); putMsg.setDouble(7, 10000.1f); putMsg.setDate(8, 2010, 1, 1); putMsg.setTime(9, 12, 1, 10, 0); putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1); putMsg.setString(11, "hello? string."); putMsg.setVarString(12, "hello? var string"); String temp = "hello"; putMsg.setBinary(13, temp.getBytes()); putMsg.setVarBinary(14, temp.getBytes()); // put messages to p2p queue for (int i = 0; i < 100; i++) { p2pQueue.put(putMsg); } // wait for cq result. int count = 0; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count++; if (count == 20) break; } // get stop, cq result queue while (!queue.getStop()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } // close connection client.close(); } public void sample() { // get client instance MyCQClient client = MyCQFactory.getMyCQClient(); try { // connect client.connect("localhost", 3030, "root", "1234"); // get cq manager CQManager cqManager = client.getCQManager(); // get queue manager QueueManager queueManager = client.getQueueManager(); cqManager.deleteAllCQ(); queueManager.deleteAllQueues(); // Queue Schema // get queue schema instance QueueSchema queueSchema = MyCQFactory.getQueueSchema(); // set queue schema examSetQueueSchema(queueSchema); // P2P Queue // create p2p queue queueManager.createP2PQueue("p2pQueue", queueSchema, 100); // p2p queue example examP2PQueue(); // TOPIC Queue // create a topic queue queueManager.createTopicQueue("topicQueue", queueSchema, 100); // topic queue example examTopicQueue(); // CQResult Queue // create cq String cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] select count(*) from win1"; cqManager.createCQ("cq1", cq1, 100); // CQResult Queue // create cq String cq2 = "window topicQueue as win1[size=5sec, slide=5sec] select count(*) from win1"; cqManager.createCQ("cq2", cq2, 100); examCQResultQueue(); // delete all cq cqManager.deleteAllCQ(); // delete all queues queueManager.deleteAllQueues(); } catch (MyCQException ex) { System.out.println("code:" + ex.getCode() + ", message:" + ex.getMessage()); } // close connection client.close(); } }
abstract void mycq.OnGetHandler.onGet | ( | Message | message | ) | [pure virtual] |
Get asynchronously, when there is a message in a queue.
message | A reference of a message instance. |