diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java index 823f4e7438..a3989341de 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSDurableSubNoLocalChangedTest.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.net.URI; -import java.util.ArrayList; -import java.util.List; import javax.jms.JMSException; import javax.jms.Message; @@ -38,7 +36,6 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -53,7 +50,7 @@ public class JMSDurableSubNoLocalChangedTest { private static final Logger LOG = LoggerFactory.getLogger(JMSDurableSubNoLocalChangedTest.class); - private final int MSG_COUNT = 10; + private final int MSG_COUNT = 5; private BrokerService brokerService; private URI connectionUri; @@ -62,7 +59,7 @@ public class JMSDurableSubNoLocalChangedTest { private String subscriptionName; private String topicName; - private final List connections = new ArrayList(); + private TopicConnection connection; @Rule public TestName name = new TestName(); @@ -70,49 +67,27 @@ public class JMSDurableSubNoLocalChangedTest { TopicConnection connection = JMSClientContext.INSTANCE.createTopicConnection(connectionUri, null, null, clientId, true); connection.start(); - connections.add(connection); - return connection; } @Before public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setUseJmx(true); - brokerService.getManagementContext().setCreateMBeanServer(false); - brokerService.setPersistent(false); - brokerService.setAdvisorySupport(false); - brokerService.setSchedulerSupport(false); - brokerService.setKeepDurableSubsActive(false); - brokerService.addConnector("amqp://0.0.0.0:0"); - brokerService.start(); - - connectionUri = new URI("amqp://localhost:" + - brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort()); - - clientId = name.getMethodName() + "-ClientId"; - subscriptionName = name.getMethodName() + "-Subscription"; - topicName = name.getMethodName(); + startBroker(); } @After public void tearDown() throws Exception { - for (TopicConnection connection : connections) { - try { - connection.close(); - } catch (Exception e) {} + try { + connection.close(); + } catch (Exception e) { } - connections.clear(); - - brokerService.stop(); - brokerService.waitUntilStopped(); + stopBroker(); } - @Ignore("Not yet working with current QPid JMS client") @Test(timeout = 60000) - public void testDurableResubscribeWithNewNoLocalValue() throws Exception { - TopicConnection connection = createConnection(); + public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception { + connection = createConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); @@ -130,13 +105,13 @@ public class JMSDurableSubNoLocalChangedTest { // Standard subscriber should receive them for (int i = 0; i < MSG_COUNT; ++i) { - Message message = nonDurableSubscriber.receive(5000); + Message message = nonDurableSubscriber.receive(2000); assertNotNull(message); } // Durable noLocal=true subscription should not receive them { - Message message = durableSubscriber.receive(2000); + Message message = durableSubscriber.receive(500); assertNull(message); } @@ -175,7 +150,7 @@ public class JMSDurableSubNoLocalChangedTest { // Durable noLocal=false subscription should not receive them as the subscriptions should // have been removed and recreated to update the noLocal flag. { - Message message = durableSubscriber.receive(2000); + Message message = durableSubscriber.receive(500); assertNull(message); } @@ -184,7 +159,98 @@ public class JMSDurableSubNoLocalChangedTest { // Durable subscriber should receive them for (int i = 0; i < MSG_COUNT; ++i) { - Message message = durableSubscriber.receive(5000); + Message message = durableSubscriber.receive(2000); + assertNotNull("Should get local messages now", message); + } + } + + @Test(timeout = 60000) + public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception { + connection = createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(topicName); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic); + + // Public first set, only the non durable sub should get these. + publishToTopic(session, topic); + + LOG.debug("Testing that noLocal=true subscription doesn't get any messages."); + + // Standard subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = nonDurableSubscriber.receive(2000); + assertNotNull(message); + } + + // Durable noLocal=true subscription should not receive them + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public second set for testing durable sub changed. + publishToTopic(session, topic); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable now goes inactive. + durableSubscriber.close(); + + assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + LOG.debug("Testing that updated noLocal=false subscription does get any messages."); + + connection.close(); + + restartBroker(); + + connection = createConnection(); + + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // The previous subscription should be restored as an offline subscription. + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Recreate a Durable Topic Subscription with noLocal set to false. + durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable noLocal=false subscription should not receive them as the subscriptions should + // have been removed and recreated to update the noLocal flag. + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public third set which should get queued for the durable sub with noLocal=false + publishToTopic(session, topic); + + // Durable subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(2000); assertNotNull("Should get local messages now", message); } } @@ -197,4 +263,41 @@ public class JMSDurableSubNoLocalChangedTest { publisher.close(); } + + private void startBroker() throws Exception { + createBroker(true); + } + + private void restartBroker() throws Exception { + stopBroker(); + createBroker(false); + } + + private void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } + + private void createBroker(boolean deleteMessages) throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateMBeanServer(false); + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(deleteMessages); + brokerService.setAdvisorySupport(false); + brokerService.setSchedulerSupport(false); + brokerService.setKeepDurableSubsActive(false); + brokerService.addConnector("amqp://0.0.0.0:0"); + brokerService.start(); + + connectionUri = new URI("amqp://localhost:" + + brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort()); + + clientId = name.getMethodName() + "-ClientId"; + subscriptionName = name.getMethodName() + "-Subscription"; + topicName = name.getMethodName(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index d22801c935..e03ca32ea1 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.management.ObjectName; @@ -43,11 +43,13 @@ import org.slf4j.LoggerFactory; public abstract class AbstractSubscription implements Subscription { private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class); + protected Broker broker; protected ConnectionContext context; protected ConsumerInfo info; protected final DestinationFilter destinationFilter; protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList(); + private BooleanExpression selectorExpression; private ObjectName objectName; private int cursorMemoryHighWaterMark = 70; @@ -180,6 +182,7 @@ public abstract class AbstractSubscription implements Subscription { public int getPrefetchSize() { return info.getPrefetchSize(); } + public void setPrefetchSize(int newSize) { info.setPrefetchSize(newSize); } @@ -210,7 +213,6 @@ public abstract class AbstractSubscription implements Subscription { if (result) { doAddRecoveredMessage(message); } - } finally { msgContext.clear(); } @@ -245,7 +247,6 @@ public abstract class AbstractSubscription implements Subscription { * @param destination */ public void addDestination(Destination destination) { - } /** @@ -253,7 +254,6 @@ public abstract class AbstractSubscription implements Subscription { * @param destination */ public void removeDestination(Destination destination) { - } @Override @@ -289,14 +289,17 @@ public abstract class AbstractSubscription implements Subscription { this.lastAckTime = value; } + @Override public long getConsumedCount(){ return subscriptionStatistics.getConsumedCount().getCount(); } + @Override public void incrementConsumedCount(){ subscriptionStatistics.getConsumedCount().increment(); } + @Override public void resetConsumedCount(){ subscriptionStatistics.getConsumedCount().reset(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0107c589dc..44228d755e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -307,7 +307,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us @Override public void setSelector(String selector) throws InvalidSelectorException { - throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); + if (active.get()) { + throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); + } else { + super.setSelector(getSelector()); + } } @Override @@ -347,7 +351,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us MessageReference node = pending.next(); node.decrementReferenceCount(); } - } finally { pending.release(); pending.clear(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 61c62ce963..02c5fbe7e3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -212,9 +212,29 @@ public class Topic extends BaseDestination implements Task { } private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) { + if (hasSelectorChanged(info1, info2)) { + return true; + } + + return hasNoLocalChanged(info1, info2); + } + + private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) { + // Prior to V11 the broker did not store the noLocal value for durable subs. + if (brokerService.getStoreOpenWireVersion() >= 11) { + if (info1.isNoLocal() ^ info2.isNoLocal()) { + return true; + } + } + + return false; + } + + private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { if (info1.getSelector() != null ^ info2.getSelector() != null) { return true; } + if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { return true; } @@ -242,6 +262,10 @@ public class Topic extends BaseDestination implements Task { // Need to delete the subscription topicStore.deleteSubscription(clientId, subscriptionName); info = null; + // Force a rebuild of the selector chain for the subscription otherwise + // the stored subscription is updated but the selector expression is not + // and the subscription will not behave according to the new configuration. + subscription.setSelector(subscription.getConsumerInfo().getSelector()); synchronized (consumers) { consumers.remove(subscription); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java new file mode 100644 index 0000000000..73d509917f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUpdatesTest.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that the durable sub updates when the offline sub is reactivated with new values. + */ +public class DurableSubscriptionUpdatesTest { + + private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionUpdatesTest.class); + + private final int MSG_COUNT = 5; + + private BrokerService brokerService; + private URI connectionUri; + + private String clientId; + private String subscriptionName; + private String topicName; + + private TopicConnection connection; + + @Rule public TestName name = new TestName(); + + protected TopicConnection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseAsyncSend(true); + + TopicConnection connection = factory.createTopicConnection(); + connection.setClientID(clientId); + connection.start(); + + return connection; + } + + @Before + public void setUp() throws Exception { + startBroker(); + } + + @After + public void tearDown() throws Exception { + try { + connection.close(); + } catch (Exception e) { + } + + stopBroker(); + } + + @Test(timeout = 60000) + public void testSelectorChange() throws Exception { + connection = createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(topicName); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, "JMSPriority > 8", false); + + // Public first set, only the non durable sub should get these. + publishToTopic(session, topic, 9); + publishToTopic(session, topic, 8); + + // Standard subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(2000); + assertNotNull(message); + assertEquals(9, message.getJMSPriority()); + } + + // Subscriber should not receive the others. + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public second set for testing durable sub changed. + publishToTopic(session, topic, 9); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable now goes inactive. + durableSubscriber.close(); + + assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + LOG.debug("Testing that updated selector subscription does get any messages."); + + // Recreate a Durable Topic Subscription with noLocal set to false. + durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, "JMSPriority > 7", false); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable subscription should not receive them as the subscriptions should + // have been removed and recreated to update the noLocal flag. + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public third set which should get queued for the durable sub with noLocal=false + publishToTopic(session, topic, 8); + + // Durable subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(5000); + assertNotNull("Should get messages now", message); + assertEquals(8, message.getJMSPriority()); + } + } + + @Test(timeout = 60000) + public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception { + connection = createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(topicName); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic); + + // Public first set, only the non durable sub should get these. + publishToTopic(session, topic); + + LOG.debug("Testing that noLocal=true subscription doesn't get any messages."); + + // Standard subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = nonDurableSubscriber.receive(2000); + assertNotNull(message); + } + + // Durable noLocal=true subscription should not receive them + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public second set for testing durable sub changed. + publishToTopic(session, topic); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable now goes inactive. + durableSubscriber.close(); + + assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + LOG.debug("Testing that updated noLocal=false subscription does get any messages."); + + // Recreate a Durable Topic Subscription with noLocal set to false. + durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable noLocal=false subscription should not receive them as the subscriptions should + // have been removed and recreated to update the noLocal flag. + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public third set which should get queued for the durable sub with noLocal=false + publishToTopic(session, topic); + + // Durable subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(5000); + assertNotNull("Should get local messages now", message); + } + } + + @Test(timeout = 60000) + public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception { + connection = createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(topicName); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true); + + // Create a Durable Topic Subscription with noLocal set to true. + TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic); + + // Public first set, only the non durable sub should get these. + publishToTopic(session, topic); + + LOG.debug("Testing that noLocal=true subscription doesn't get any messages."); + + // Standard subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = nonDurableSubscriber.receive(2000); + assertNotNull(message); + } + + // Durable noLocal=true subscription should not receive them + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public second set for testing durable sub changed. + publishToTopic(session, topic); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable now goes inactive. + durableSubscriber.close(); + + assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + LOG.debug("Testing that updated noLocal=false subscription does get any messages."); + + connection.close(); + + restartBroker(); + + connection = createConnection(); + + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // The previous subscription should be restored as an offline subscription. + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Recreate a Durable Topic Subscription with noLocal set to false. + durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable noLocal=false subscription should not receive them as the subscriptions should + // have been removed and recreated to update the noLocal flag. + { + Message message = durableSubscriber.receive(500); + assertNull(message); + } + + // Public third set which should get queued for the durable sub with noLocal=false + publishToTopic(session, topic); + + // Durable subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(2000); + assertNotNull("Should get local messages now", message); + } + } + + private void publishToTopic(TopicSession session, Topic destination) throws Exception { + publishToTopic(session, destination, Message.DEFAULT_PRIORITY); + } + + private void publishToTopic(TopicSession session, Topic destination, int priority) throws Exception { + TopicPublisher publisher = session.createPublisher(destination); + for (int i = 0; i < MSG_COUNT; ++i) { + publisher.send(session.createMessage(), Message.DEFAULT_DELIVERY_MODE, priority, Message.DEFAULT_TIME_TO_LIVE); + } + + publisher.close(); + } + + private void startBroker() throws Exception { + createBroker(true); + } + + private void restartBroker() throws Exception { + stopBroker(); + createBroker(false); + } + + private void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } + + private void createBroker(boolean deleteMessages) throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateMBeanServer(false); + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(deleteMessages); + brokerService.setAdvisorySupport(false); + brokerService.setSchedulerSupport(false); + brokerService.setKeepDurableSubsActive(false); + TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + + connectionUri = connector.getPublishableConnectURI(); + + clientId = name.getMethodName() + "-ClientId"; + subscriptionName = name.getMethodName() + "-Subscription"; + topicName = name.getMethodName(); + } +}