diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 527066aed0..9504aca71f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -48,7 +49,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; private final boolean keepDurableSubsActive; - private boolean active; + private AtomicBoolean active = new AtomicBoolean(); public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws JMSException { @@ -62,11 +63,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public boolean isActive() { - return active; + return active.get(); } public boolean isFull() { - return !active || super.isFull(); + return !active.get() || super.isFull(); } public void gc() { @@ -100,7 +101,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us throw jmsEx; } } - if (active || keepDurableSubsActive) { + if (active.get() || keepDurableSubsActive) { Topic topic = (Topic)destination; topic.activate(context, this); if (pending.isEmpty(topic)) { @@ -112,8 +113,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception { - if (!active) { - this.active = true; + if (!active.get()) { this.context = context; this.info = info; LOG.debug("Activating " + this); @@ -145,6 +145,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } } } + this.active.set(true); dispatchPending(); this.usageManager.getMemoryUsage().addUsageListener(this); } @@ -152,7 +153,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public void deactivate(boolean keepDurableSubsActive) throws Exception { LOG.debug("Deactivating " + this); - active = false; + active.set(false); this.usageManager.getMemoryUsage().removeUsageListener(this); synchronized (pending) { pending.stop(); @@ -211,7 +212,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public void add(MessageReference node) throws Exception { - if (!active && !keepDurableSubsActive) { + if (!active.get() && !keepDurableSubsActive) { return; } super.add(node); @@ -224,7 +225,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public int getPendingQueueSize() { - if (active || keepDurableSubsActive) { + if (active.get() || keepDurableSubsActive) { return super.getPendingQueueSize(); } // TODO: need to get from store @@ -236,7 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } protected boolean canDispatch(MessageReference node) { - return active; + return active.get(); } protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 2d6a683942..1327bb38d1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -81,7 +81,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 4; i++) { Message msg = sub.receive(10000); - LOG.info("received i=" + i + ", m=" + (msg!=null? + LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() : null) ); assertNotNull("Message " + i + " was null", msg); @@ -129,10 +129,10 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { sub = consumerSession.createDurableSubscriber(topic, subName); for (int i=0; i < MSG_NUM * maxPriority; i++) { Message msg = sub.receive(10000); - assertNull("no duplicate message", dups.put(msg.getJMSMessageID(), subName)); - LOG.info("received i=" + i + ", m=" + (msg!=null? + LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() : null) ); + assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName)); assertNotNull("Message " + i + " was null", msg); messageCounts[msg.getJMSPriority()].incrementAndGet(); if (i > 0 && i % closeFrequency == 0) {