From d1ff5abf481aba0d30ac60a56191ebf6a898f0b1 Mon Sep 17 00:00:00 2001 From: "Adrian T. Co" Date: Tue, 11 Jul 2006 09:27:17 +0000 Subject: [PATCH] Allow option to unsubscribe durable subscriptions after each run git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@420774 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/tool/JmsConsumerClient.java | 22 +++++++++++++++++-- .../properties/JmsConsumerProperties.java | 9 ++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java index a78a831d49..e8afa690e9 100644 --- a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java +++ b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java @@ -88,6 +88,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { incThroughput(); } } finally { + if (client.isDurable() && client.isUnsubscribe()) { + log.info("Unsubscribing durable subscriber: " + getClientName()); + getSession().unsubscribe(getClientName()); + } getConnection().close(); } } @@ -108,6 +112,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { recvCount++; } } finally { + if (client.isDurable() && client.isUnsubscribe()) { + log.info("Unsubscribing durable subscriber: " + getClientName()); + getSession().unsubscribe(getClientName()); + } getConnection().close(); } } @@ -132,6 +140,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage()); } } finally { + if (client.isDurable() && client.isUnsubscribe()) { + log.info("Unsubscribing durable subscriber: " + getClientName()); + getSession().unsubscribe(getClientName()); + } getConnection().close(); } } @@ -161,6 +173,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage()); } } finally { + if (client.isDurable() && client.isUnsubscribe()) { + log.info("Unsubscribing durable subscriber: " + getClientName()); + getSession().unsubscribe(getClientName()); + } getConnection().close(); } } @@ -175,8 +191,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { String clientName = getClientName(); if (clientName == null) { clientName = "JmsConsumer"; + setClientName(clientName); } - log.info("Creating durable subscriber (" + getConnection().getClientID() + ") to: " + dest.toString()); + log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName); } else { log.info("Creating non-durable consumer to: " + dest.toString()); @@ -190,8 +207,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { String clientName = getClientName(); if (clientName == null) { clientName = "JmsConsumer"; + setClientName(clientName); } - log.info("Creating durable subscriber (" + getConnection().getClientID() + ") to: " + dest.toString()); + log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal); } else { log.info("Creating non-durable consumer to: " + dest.toString()); diff --git a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java index d2ff637930..e9208c595d 100644 --- a/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java +++ b/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java @@ -20,6 +20,7 @@ public class JmsConsumerProperties extends JmsClientProperties { public static final String COUNT_BASED_RECEIVING = "count"; // Receive a specific count of messages protected boolean durable = false; // Consumer is a durable subscriber + protected boolean unsubscribe = true; // If true, unsubscribe a durable subscriber after it finishes running protected boolean asyncRecv = true; // If true, use onMessage() to receive messages, else use receive() protected long recvCount = 1000000; // Receive a million messages by default @@ -34,6 +35,14 @@ public class JmsConsumerProperties extends JmsClientProperties { this.durable = durable; } + public boolean isUnsubscribe() { + return unsubscribe; + } + + public void setUnsubscribe(boolean unsubscribe) { + this.unsubscribe = unsubscribe; + } + public boolean isAsyncRecv() { return asyncRecv; }