resolution to duplicate issue from https://issues.apache.org/activemq/browse/AMQ-2980 - contention over active flag caused premature dispatch on reactivation such that there could be duplicates, fixing the contention sorts this, final piece of the puzzle, test now works as expected

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1033607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-10 17:44:06 +00:00
parent 6dea944be6
commit 6aacb034ee
2 changed files with 14 additions and 13 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@ -48,7 +49,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private boolean active;
private AtomicBoolean active = new AtomicBoolean();
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
@ -62,11 +63,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public boolean isActive() {
return active;
return active.get();
}
public boolean isFull() {
return !active || super.isFull();
return !active.get() || super.isFull();
}
public void gc() {
@ -100,7 +101,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
throw jmsEx;
}
}
if (active || keepDurableSubsActive) {
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic)destination;
topic.activate(context, this);
if (pending.isEmpty(topic)) {
@ -112,8 +113,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void activate(SystemUsage memoryManager, ConnectionContext context,
ConsumerInfo info) throws Exception {
if (!active) {
this.active = true;
if (!active.get()) {
this.context = context;
this.info = info;
LOG.debug("Activating " + this);
@ -145,6 +145,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
}
this.active.set(true);
dispatchPending();
this.usageManager.getMemoryUsage().addUsageListener(this);
}
@ -152,7 +153,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void deactivate(boolean keepDurableSubsActive) throws Exception {
LOG.debug("Deactivating " + this);
active = false;
active.set(false);
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
pending.stop();
@ -211,7 +212,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public void add(MessageReference node) throws Exception {
if (!active && !keepDurableSubsActive) {
if (!active.get() && !keepDurableSubsActive) {
return;
}
super.add(node);
@ -224,7 +225,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public int getPendingQueueSize() {
if (active || keepDurableSubsActive) {
if (active.get() || keepDurableSubsActive) {
return super.getPendingQueueSize();
}
// TODO: need to get from store
@ -236,7 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
protected boolean canDispatch(MessageReference node) {
return active;
return active.get();
}
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {

View File

@ -81,7 +81,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 4; i++) {
Message msg = sub.receive(10000);
LOG.info("received i=" + i + ", m=" + (msg!=null?
LOG.debug("received i=" + i + ", m=" + (msg!=null?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null) );
assertNotNull("Message " + i + " was null", msg);
@ -129,10 +129,10 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
sub = consumerSession.createDurableSubscriber(topic, subName);
for (int i=0; i < MSG_NUM * maxPriority; i++) {
Message msg = sub.receive(10000);
assertNull("no duplicate message", dups.put(msg.getJMSMessageID(), subName));
LOG.info("received i=" + i + ", m=" + (msg!=null?
LOG.debug("received i=" + i + ", m=" + (msg!=null?
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
: null) );
assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(), subName));
assertNotNull("Message " + i + " was null", msg);
messageCounts[msg.getJMSPriority()].incrementAndGet();
if (i > 0 && i % closeFrequency == 0) {