Additional fixes for https://issues.apache.org/jira/browse/AMQ-3855 - timing issue in adding wildcard subscriptions can result in duplicate messages sent to MQTT

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1341820 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-05-23 10:25:34 +00:00
parent 1b28cf1792
commit 144dda0308
2 changed files with 17 additions and 9 deletions

View File

@ -109,9 +109,6 @@ public class Topic extends BaseDestination implements Task {
} }
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
super.addSubscription(context, sub);
if (!sub.getConsumerInfo().isDurable()) { if (!sub.getConsumerInfo().isDurable()) {
// Do a retroactive recovery if needed. // Do a retroactive recovery if needed.
@ -121,23 +118,34 @@ public class Topic extends BaseDestination implements Task {
// while we are recovering a subscription to avoid out of order messages. // while we are recovering a subscription to avoid out of order messages.
dispatchLock.writeLock().lock(); dispatchLock.writeLock().lock();
try { try {
boolean applyRecovery = false;
synchronized (consumers) { synchronized (consumers) {
if (!consumers.contains(sub)){
sub.add(context, this); sub.add(context, this);
consumers.add(sub); consumers.add(sub);
applyRecovery=true;
super.addSubscription(context, sub);
} }
}
if (applyRecovery){
subscriptionRecoveryPolicy.recover(context, this, sub); subscriptionRecoveryPolicy.recover(context, this, sub);
}
} finally { } finally {
dispatchLock.writeLock().unlock(); dispatchLock.writeLock().unlock();
} }
} else { } else {
synchronized (consumers) { synchronized (consumers) {
if (!consumers.contains(sub)){
sub.add(context, this); sub.add(context, this);
consumers.add(sub); consumers.add(sub);
super.addSubscription(context, sub);
}
} }
} }
} else { } else {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub; DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
super.addSubscription(context, sub);
sub.add(context, this); sub.add(context, this);
if(dsub.isActive()) { if(dsub.isActive()) {
synchronized (consumers) { synchronized (consumers) {

View File

@ -255,7 +255,7 @@ public class MQTTTest {
javax.jms.Topic jmsTopic = s.createTopic("foo.far"); javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic); MessageProducer producer = s.createProducer(jmsTopic);
Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics); connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i; String payload = "This is Test Message: " + i;