diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index bf803c104f..2aa6e18cb5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -51,6 +51,8 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.NoLocalSubscriptionAware; +import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -211,7 +213,7 @@ public class Topic extends BaseDestination implements Task { } } - private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) { + private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { if (hasSelectorChanged(info1, info2)) { return true; } @@ -219,9 +221,10 @@ public class Topic extends BaseDestination implements Task { return hasNoLocalChanged(info1, info2); } - private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) { - // Prior to V11 the broker did not store the noLocal value for durable subs. - if (brokerService.getStoreOpenWireVersion() >= 11) { + private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { + //Not all persistence adapters store the noLocal value for a subscription + PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); + if (adapter instanceof NoLocalSubscriptionAware) { if (info1.isNoLocal() ^ info2.isNoLocal()) { return true; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index eca344919b..1ea3e62063 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -40,6 +41,8 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.NoLocalSubscriptionAware; +import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; @@ -373,15 +376,16 @@ public class TopicRegion extends AbstractRegion { } } - private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { + private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) throws IOException { if (info1.getSelector() != null ^ info2.getSelector() != null) { return true; } if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { return true; } - // Prior to V11 the broker did not store the noLocal value for durable subs. - if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) { + //Not all persistence adapters store the noLocal value for a subscription + PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); + if (adapter instanceof NoLocalSubscriptionAware) { if (info1.isNoLocal() ^ info2.isNoLocal()) { return true; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java b/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java new file mode 100644 index 0000000000..ae58cba7a3 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +public interface NoLocalSubscriptionAware { + + public boolean isPersistNoLocal(); +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index c16ea141d2..5c073c3283 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ProxyMessageStore; import org.apache.activemq.store.TopicMessageStore; @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean */ -public class MemoryPersistenceAdapter implements PersistenceAdapter { +public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware { private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); MemoryTransactionStore transactionStore; @@ -241,4 +242,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { // We could eventuall implement an in memory scheduler. throw new UnsupportedOperationException(); } + + /* (non-Javadoc) + * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() + */ + @Override + public boolean isPersistNoLocal() { + return true; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 2fbdcaef35..5eef7500b6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -42,6 +42,7 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.JournaledStore; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.SharedFileLocker; import org.apache.activemq.store.TopicMessageStore; @@ -61,7 +62,9 @@ import org.apache.activemq.util.ServiceStopper; * @org.apache.xbean.XBean element="kahaDB" * */ -public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware { +public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, + JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware { + private final KahaDBStore letter = new KahaDBStore(); /** @@ -788,4 +791,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { return this.letter.createJobSchedulerStore(); } + + /* (non-Javadoc) + * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() + */ + @Override + public boolean isPersistNoLocal() { + return this.letter.isPersistNoLocal(); + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 24862e3ded..66d616b78c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -63,6 +63,7 @@ import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.MessageStoreSubscriptionStatistics; +import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; @@ -86,7 +87,7 @@ import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { +public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware { static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; @@ -1567,4 +1568,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { return new JobSchedulerStoreImpl(); } + + /* (non-Javadoc) + * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() + */ + @Override + public boolean isPersistNoLocal() { + // Prior to v11 the broker did not store the noLocal value for durable subs. + return brokerService.getStoreOpenWireVersion() >= 11; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index 31268a3509..68f0ed66cc 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -48,6 +48,7 @@ import org.apache.activemq.filter.AnyDestination; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.SharedFileLocker; import org.apache.activemq.store.TopicMessageStore; @@ -69,7 +70,9 @@ import org.slf4j.LoggerFactory; * * @org.apache.xbean.XBean element="mKahaDB" */ -public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware { +public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, + BrokerServiceAware, NoLocalSubscriptionAware { + static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class); final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}); @@ -532,4 +535,13 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { return new JobSchedulerStoreImpl(); } + + /* (non-Javadoc) + * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() + */ + @Override + public boolean isPersistNoLocal() { + // Prior to v11 the broker did not store the noLocal value for durable subs. + return brokerService.getStoreOpenWireVersion() >= 11; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java index c0aee13e80..899c6a36b6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java @@ -388,7 +388,7 @@ public class DurableSubscriptionOffline3Test extends DurableSubscriptionOfflineT // test offline subs con = createConnection("offCli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + consumer = session.createDurableSubscriber(topic, "SubsId", null, false); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java index ecbfac1128..55da21b797 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java @@ -25,8 +25,6 @@ import java.util.Collection; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; @@ -40,13 +38,10 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW; import org.apache.activemq.store.kahadb.KahaDBStore; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName;