mirror of https://github.com/apache/activemq.git
Allow option to unsubscribe durable subscriptions after each run
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@420774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6722a0327e
commit
d1ff5abf48
|
@ -88,6 +88,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
incThroughput();
|
||||
}
|
||||
} finally {
|
||||
if (client.isDurable() && client.isUnsubscribe()) {
|
||||
log.info("Unsubscribing durable subscriber: " + getClientName());
|
||||
getSession().unsubscribe(getClientName());
|
||||
}
|
||||
getConnection().close();
|
||||
}
|
||||
}
|
||||
|
@ -108,6 +112,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
recvCount++;
|
||||
}
|
||||
} finally {
|
||||
if (client.isDurable() && client.isUnsubscribe()) {
|
||||
log.info("Unsubscribing durable subscriber: " + getClientName());
|
||||
getSession().unsubscribe(getClientName());
|
||||
}
|
||||
getConnection().close();
|
||||
}
|
||||
}
|
||||
|
@ -132,6 +140,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
|
||||
}
|
||||
} finally {
|
||||
if (client.isDurable() && client.isUnsubscribe()) {
|
||||
log.info("Unsubscribing durable subscriber: " + getClientName());
|
||||
getSession().unsubscribe(getClientName());
|
||||
}
|
||||
getConnection().close();
|
||||
}
|
||||
}
|
||||
|
@ -161,6 +173,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
|
||||
}
|
||||
} finally {
|
||||
if (client.isDurable() && client.isUnsubscribe()) {
|
||||
log.info("Unsubscribing durable subscriber: " + getClientName());
|
||||
getSession().unsubscribe(getClientName());
|
||||
}
|
||||
getConnection().close();
|
||||
}
|
||||
}
|
||||
|
@ -175,8 +191,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
String clientName = getClientName();
|
||||
if (clientName == null) {
|
||||
clientName = "JmsConsumer";
|
||||
setClientName(clientName);
|
||||
}
|
||||
log.info("Creating durable subscriber (" + getConnection().getClientID() + ") to: " + dest.toString());
|
||||
log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
|
||||
jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName);
|
||||
} else {
|
||||
log.info("Creating non-durable consumer to: " + dest.toString());
|
||||
|
@ -190,8 +207,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
String clientName = getClientName();
|
||||
if (clientName == null) {
|
||||
clientName = "JmsConsumer";
|
||||
setClientName(clientName);
|
||||
}
|
||||
log.info("Creating durable subscriber (" + getConnection().getClientID() + ") to: " + dest.toString());
|
||||
log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
|
||||
jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal);
|
||||
} else {
|
||||
log.info("Creating non-durable consumer to: " + dest.toString());
|
||||
|
|
|
@ -20,6 +20,7 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
|||
public static final String COUNT_BASED_RECEIVING = "count"; // Receive a specific count of messages
|
||||
|
||||
protected boolean durable = false; // Consumer is a durable subscriber
|
||||
protected boolean unsubscribe = true; // If true, unsubscribe a durable subscriber after it finishes running
|
||||
protected boolean asyncRecv = true; // If true, use onMessage() to receive messages, else use receive()
|
||||
|
||||
protected long recvCount = 1000000; // Receive a million messages by default
|
||||
|
@ -34,6 +35,14 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
|||
this.durable = durable;
|
||||
}
|
||||
|
||||
public boolean isUnsubscribe() {
|
||||
return unsubscribe;
|
||||
}
|
||||
|
||||
public void setUnsubscribe(boolean unsubscribe) {
|
||||
this.unsubscribe = unsubscribe;
|
||||
}
|
||||
|
||||
public boolean isAsyncRecv() {
|
||||
return asyncRecv;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue