import java.util.List;
import mycq.*;
class MyOnGetHandler extends OnGetHandler {
public void onGet(Message msg) {
try {
System.out.println("col 0:" + msg.getBoolean(0));
System.out.println("col 1:" + msg.getByte(1));
System.out.println("col 2:" + msg.getShort(2));
System.out.println("col 3:" + msg.getUShort(3));
System.out.println("col 4:" + msg.getInt(4));
System.out.println("col 5:" + msg.getLong(5));
System.out.println("col 6:" + msg.getFloat(6));
System.out.println("col 7:" + msg.getDouble(7));
System.out.println("col 8:" + msg.getDateString(8));
System.out.println("col 9:" + msg.getTimeString(9));
System.out.println("col10:" + msg.getDateTimeString(10));
System.out.println("col11:" + msg.getString(11));
System.out.println("col12:" + msg.getVarString(12));
System.out.println("col13:" + msg.getBinary(13));
System.out.println("col14:" + msg.getVarBinary(14));
} catch (MyCQException ex) {
System.out.println(ex.getCode() + ":" + ex.getMessage());
}
}
}
class MyOnGetHandler2 extends OnGetHandler {
public void onGet(Message msg) {
try {
System.out.println("col 0:" + msg.getLong(0));
} catch (MyCQException ex) {
System.out.println(ex.getCode() + ":" + ex.getMessage());
}
}
}
public class ExamQueue {
void examSetQueueSchema(QueueSchema queueSchema) {
try {
queueSchema.addBoolean("boolean");
queueSchema.addByte("byte");
queueSchema.addShort("short");
queueSchema.addUShort("ushort");
queueSchema.addInt("int");
queueSchema.addLong("long");
queueSchema.addFloat("float");
queueSchema.addDouble("double");
queueSchema.addDate("date");
queueSchema.addTime("time");
queueSchema.addDateTime("datetime");
queueSchema.addString("string", 100);
queueSchema.addVarString("varstring");
queueSchema.addBinary("binary", 100);
queueSchema.addVarBinary("varbinary");
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
}
void examShowMessage(Message msg) {
try {
System.out.println("col 0:" + msg.getBoolean(0));
System.out.println("col 1:" + msg.getByte(1));
System.out.println("col 2:" + msg.getShort(2));
System.out.println("col 3:" + msg.getUShort(3));
System.out.println("col 4:" + msg.getInt(4));
System.out.println("col 5:" + msg.getLong(5));
System.out.println("col 6:" + msg.getFloat(6));
System.out.println("col 7:" + msg.getDouble(7));
System.out.println("col 8:" + msg.getDateString(8));
System.out.println("col 9:" + msg.getTimeString(9));
System.out.println("col10:" + msg.getDateTimeString(10));
System.out.println("col11:" + msg.getString(11));
System.out.println("col12:" + msg.getVarString(12));
System.out.println("col13:" + msg.getBinary(13));
System.out.println("col14:" + msg.getVarBinary(14));
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
}
void examP2PQueue() {
MyCQClient client = MyCQFactory.getMyCQClient();
try {
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
P2PQueue queue = queueManager.getP2PQueue("p2pQueue");
Message putMsg = queue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, (byte) 0x41);
putMsg.setShort(2, (short) 100);
putMsg.setUShort(3, (char) 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
String temp = "hello?";
putMsg.setBinary(13, temp.getBytes());
putMsg.setVarBinary(14, temp.getBytes());
queue.put(putMsg);
Message getMsg = queue.get();
if (getMsg != null) {
examShowMessage(getMsg);
}
queue.putTx(putMsg);
getMsg = queue.getTx();
if (getMsg != null) {
examShowMessage(getMsg);
}
MyOnGetHandler handler = new MyOnGetHandler();
queue.getStart(handler);
for (int i = 0; i < 100; i++) {
queue.put(putMsg);
}
while (!queue.getStop()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
client.close();
}
void examTopicQueue() {
MyCQClient client = MyCQFactory.getMyCQClient();
try {
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
TopicQueue queue = queueManager.getTopicQueue("topicQueue");
MyOnGetHandler handler = new MyOnGetHandler();
queue.getStart(handler);
Message putMsg = queue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, (byte) 0x41);
putMsg.setShort(2, (short) 100);
putMsg.setUShort(3, (char) 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
String temp = "hello";
putMsg.setBinary(13, temp.getBytes());
putMsg.setVarBinary(14, temp.getBytes());
for (int i = 0; i < 100; i++) {
queue.put(putMsg);
}
while (!queue.getStop()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
client.close();
}
void examCQResultQueue() {
MyCQClient client = MyCQFactory.getMyCQClient();
try {
client.connect("localhost", 3030, "root", "1234");
QueueManager queueManager = client.getQueueManager();
CQResultQueue queue = queueManager.getCQResultQueue("cq1");
QueueSchema schema = queue.getQueueSchema();
List<Column> columns = schema.getColumns();
MyOnGetHandler2 handler = new MyOnGetHandler2();
queue.getStart(handler);
P2PQueue p2pQueue = queueManager.getP2PQueue("p2pQueue");
Message putMsg = p2pQueue.getBlankMessage();
putMsg.setBoolean(0, true);
putMsg.setByte(1, (byte) 0x41);
putMsg.setShort(2, (short) 100);
putMsg.setUShort(3, (char) 200);
putMsg.setInt(4, 1000);
putMsg.setLong(5, 10000);
putMsg.setFloat(6, 100.1f);
putMsg.setDouble(7, 10000.1f);
putMsg.setDate(8, 2010, 1, 1);
putMsg.setTime(9, 12, 1, 10, 0);
putMsg.setDateTime(10, 2010, 1, 1, 1, 1, 1, 1);
putMsg.setString(11, "hello? string.");
putMsg.setVarString(12, "hello? var string");
String temp = "hello";
putMsg.setBinary(13, temp.getBytes());
putMsg.setVarBinary(14, temp.getBytes());
for (int i = 0; i < 100; i++) {
p2pQueue.put(putMsg);
}
int count = 0;
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
if (count == 20)
break;
}
while (!queue.getStop()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
client.close();
}
public void sample() {
MyCQClient client = MyCQFactory.getMyCQClient();
try {
client.connect("localhost", 3030, "root", "1234");
CQManager cqManager = client.getCQManager();
QueueManager queueManager = client.getQueueManager();
cqManager.deleteAllCQ();
queueManager.deleteAllQueues();
QueueSchema queueSchema = MyCQFactory.getQueueSchema();
examSetQueueSchema(queueSchema);
queueManager.createP2PQueue("p2pQueue", queueSchema, 100);
examP2PQueue();
queueManager.createTopicQueue("topicQueue", queueSchema, 100);
examTopicQueue();
String cq1 = "window p2pQueue as win1[size=5sec, slide=5sec] select count(*) from win1";
cqManager.createCQ("cq1", cq1, 100);
String cq2 = "window topicQueue as win1[size=5sec, slide=5sec] select count(*) from win1";
cqManager.createCQ("cq2", cq2, 100);
examCQResultQueue();
cqManager.deleteAllCQ();
queueManager.deleteAllQueues();
} catch (MyCQException ex) {
System.out.println("code:" + ex.getCode() + ", message:"
+ ex.getMessage());
}
client.close();
}
}