Added support for sync and async.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@411085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Frederick G. Oconer 2006-06-02 08:24:47 +00:00
parent 3a92ec3802
commit 16439a4b4e
1 changed files with 47 additions and 25 deletions

View File

@ -29,6 +29,7 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi
private Destination destination = null; private Destination destination = null;
private boolean isDurable = false; private boolean isDurable = false;
private boolean isAsync = true;
public JmsConsumerClient(ConnectionFactory factory) { public JmsConsumerClient(ConnectionFactory factory) {
this.factory = factory; this.factory = factory;
@ -66,48 +67,53 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi
setDestination(getDestinationName()); setDestination(getDestinationName());
} }
System.out.println("Connecting to URL: " + brokerUrl);
System.out.println("Consuming: " + destination);
System.out.println("Using " + (isDurable ? "durable" : "non-durable") + " subscription");
if (isDurable) { if (isDurable) {
createDurableSubscriber((Topic) getDestination(), getClass().getName()); createDurableSubscriber((Topic) getDestination(), getClass().getName());
} else { } else {
createMessageConsumer(getDestination()); createMessageConsumer(getDestination());
} }
getMessageConsumer().setMessageListener(this); if (isAsync) {
getConnection().start(); getMessageConsumer().setMessageListener(this);
getConnection().start();
try { try {
Thread.sleep(duration); Thread.sleep(duration);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new JMSException("Error while consumer is sleeping " + e.getMessage()); throw new JMSException("Error while consumer is sleeping " + e.getMessage());
}
} else {
getConnection().start();
consumeMessages(getMessageConsumer(), duration);
} }
getMessageConsumer().close(); close(); //close consumer, session, and connection.
getConnection().close();
System.out.println("Throughput : " + this.getThroughput());
listener.onConfigEnd(this); listener.onConfigEnd(this);
} }
//Increments throughput
public void onMessage(Message message) { public void onMessage(Message message) {
try { System.out.println(message.toString());
TextMessage textMessage = (TextMessage) message; this.incThroughput();
}
// lets force the content to be deserialized protected void consumeMessages(MessageConsumer consumer, long duration) throws JMSException {
String text = textMessage.getText();
System.out.println("message: " + text + ":" + this.getThroughput()); long currentTime = System.currentTimeMillis();
this.incThroughput(); long endTime = currentTime + duration;
} catch (JMSException e) {
// TODO Auto-generated catch block while (System.currentTimeMillis() <= endTime) {
e.printStackTrace(); Message message = consumer.receive();
onMessage(message);
} }
} }
protected void close() throws JMSException {
getMessageConsumer().close();
getSession().close();
getConnection().close();
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO"); JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO");
cons.setPerfEventListener(new PerfEventAdapter()); cons.setPerfEventListener(new PerfEventAdapter());
@ -116,6 +122,22 @@ public class JmsConsumerClient extends JmsPerfClientSupport implements MessageLi
// Helper Methods // Helper Methods
public boolean isDurable() {
return isDurable;
}
public void setDurable(boolean durable) {
isDurable = durable;
}
public boolean isAsync() {
return isAsync;
}
public void setAsync(boolean async) {
isAsync = async;
}
public String getDestinationName() { public String getDestinationName() {
return this.destinationName; return this.destinationName;
} }