From 16439a4b4e480c114521055f896e50405f921b1a Mon Sep 17 00:00:00 2001 From: "Frederick G. Oconer" Date: Fri, 2 Jun 2006 08:24:47 +0000 Subject: [PATCH] Added support for sync and async. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@411085 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/tool/JmsConsumerClient.java | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java index 95faa4aa9b..aa4211a024 100644 --- a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java +++ b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java @@ -29,6 +29,7 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi private Destination destination = null; private boolean isDurable = false; + private boolean isAsync = true; public JmsConsumerClient(ConnectionFactory factory) { this.factory = factory; @@ -66,48 +67,53 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi setDestination(getDestinationName()); } - System.out.println("Connecting to URL: " + brokerUrl); - System.out.println("Consuming: " + destination); - System.out.println("Using " + (isDurable ? "durable" : "non-durable") + " subscription"); - - if (isDurable) { createDurableSubscriber((Topic) getDestination(), getClass().getName()); } else { createMessageConsumer(getDestination()); } - getMessageConsumer().setMessageListener(this); - getConnection().start(); + if (isAsync) { + getMessageConsumer().setMessageListener(this); + getConnection().start(); - try { - Thread.sleep(duration); - } catch (InterruptedException e) { - throw new JMSException("Error while consumer is sleeping " + e.getMessage()); + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + throw new JMSException("Error while consumer is sleeping " + e.getMessage()); + } + } else { + getConnection().start(); + consumeMessages(getMessageConsumer(), duration); } - getMessageConsumer().close(); - getConnection().close(); - - System.out.println("Throughput : " + this.getThroughput()); - + close(); //close consumer, session, and connection. listener.onConfigEnd(this); } + //Increments throughput public void onMessage(Message message) { - try { - TextMessage textMessage = (TextMessage) message; + System.out.println(message.toString()); + this.incThroughput(); + } - // lets force the content to be deserialized - String text = textMessage.getText(); - System.out.println("message: " + text + ":" + this.getThroughput()); - this.incThroughput(); - } catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + protected void consumeMessages(MessageConsumer consumer, long duration) throws JMSException { + + long currentTime = System.currentTimeMillis(); + long endTime = currentTime + duration; + + while (System.currentTimeMillis() <= endTime) { + Message message = consumer.receive(); + onMessage(message); } } + protected void close() throws JMSException { + getMessageConsumer().close(); + getSession().close(); + getConnection().close(); + } + public static void main(String[] args) throws Exception { JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO"); cons.setPerfEventListener(new PerfEventAdapter()); @@ -116,6 +122,22 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi // Helper Methods + public boolean isDurable() { + return isDurable; + } + + public void setDurable(boolean durable) { + isDurable = durable; + } + + public boolean isAsync() { + return isAsync; + } + + public void setAsync(boolean async) { + isAsync = async; + } + public String getDestinationName() { return this.destinationName; }