added another option that allows you to basically drain a destination for while messages are being produced.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-03 23:21:37 +00:00
parent 7f9f040835
commit da26ccbd32
1 changed files with 30 additions and 6 deletions

View File

@ -42,7 +42,7 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
private boolean running;
private Session session;
private long sleepTime=0;
private long receiveTimeOut=0;
public static void main(String[] args) {
ConsumerTool tool = new ConsumerTool();
@ -70,6 +70,10 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
if (args.length > 7) {
tool.sleepTime = Long.parseLong(args[7]);
}
if (args.length > 8) {
tool.receiveTimeOut = Long.parseLong(args[8]);
}
tool.run();
}
@ -92,12 +96,14 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages <= 0) {
consumer.setMessageListener(this);
}
if (maxiumMessages > 0) {
if ( maxiumMessages > 0 ) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if(receiveTimeOut==0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}
}
catch (Exception e) {
@ -175,5 +181,23 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
System.in.read();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
Message message;
while ( (message = consumer.receive(timeout)) != null ) {
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
}