When possible check for noLocl changes on durable subscription
reactivation and recreate the durable sub if it changes.  For both
selector change and noLocal change also update the AbstractSubscription
selectorExpression so it matches with what was requested.
This commit is contained in:
Timothy Bish 2015-10-01 19:23:48 -04:00
parent 7c7c505057
commit 81b4b9ae3d
5 changed files with 565 additions and 44 deletions
activemq-amqp/src/test/java/org/apache/activemq/transport/amqp
activemq-broker/src/main/java/org/apache/activemq/broker/region
activemq-unit-tests/src/test/java/org/apache/activemq/usecases

View File

@ -22,8 +22,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -38,7 +36,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@ -53,7 +50,7 @@ public class JMSDurableSubNoLocalChangedTest {
private static final Logger LOG = LoggerFactory.getLogger(JMSDurableSubNoLocalChangedTest.class); private static final Logger LOG = LoggerFactory.getLogger(JMSDurableSubNoLocalChangedTest.class);
private final int MSG_COUNT = 10; private final int MSG_COUNT = 5;
private BrokerService brokerService; private BrokerService brokerService;
private URI connectionUri; private URI connectionUri;
@ -62,7 +59,7 @@ public class JMSDurableSubNoLocalChangedTest {
private String subscriptionName; private String subscriptionName;
private String topicName; private String topicName;
private final List<TopicConnection> connections = new ArrayList<TopicConnection>(); private TopicConnection connection;
@Rule public TestName name = new TestName(); @Rule public TestName name = new TestName();
@ -70,49 +67,27 @@ public class JMSDurableSubNoLocalChangedTest {
TopicConnection connection = JMSClientContext.INSTANCE.createTopicConnection(connectionUri, null, null, clientId, true); TopicConnection connection = JMSClientContext.INSTANCE.createTopicConnection(connectionUri, null, null, clientId, true);
connection.start(); connection.start();
connections.add(connection);
return connection; return connection;
} }
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
brokerService = new BrokerService(); startBroker();
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(false);
brokerService.setKeepDurableSubsActive(false);
brokerService.addConnector("amqp://0.0.0.0:0");
brokerService.start();
connectionUri = new URI("amqp://localhost:" +
brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort());
clientId = name.getMethodName() + "-ClientId";
subscriptionName = name.getMethodName() + "-Subscription";
topicName = name.getMethodName();
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
for (TopicConnection connection : connections) {
try { try {
connection.close(); connection.close();
} catch (Exception e) {} } catch (Exception e) {
} }
connections.clear(); stopBroker();
brokerService.stop();
brokerService.waitUntilStopped();
} }
@Ignore("Not yet working with current QPid JMS client")
@Test(timeout = 60000) @Test(timeout = 60000)
public void testDurableResubscribeWithNewNoLocalValue() throws Exception { public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception {
TopicConnection connection = createConnection(); connection = createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName); Topic topic = session.createTopic(topicName);
@ -130,13 +105,13 @@ public class JMSDurableSubNoLocalChangedTest {
// Standard subscriber should receive them // Standard subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) { for (int i = 0; i < MSG_COUNT; ++i) {
Message message = nonDurableSubscriber.receive(5000); Message message = nonDurableSubscriber.receive(2000);
assertNotNull(message); assertNotNull(message);
} }
// Durable noLocal=true subscription should not receive them // Durable noLocal=true subscription should not receive them
{ {
Message message = durableSubscriber.receive(2000); Message message = durableSubscriber.receive(500);
assertNull(message); assertNull(message);
} }
@ -175,7 +150,7 @@ public class JMSDurableSubNoLocalChangedTest {
// Durable noLocal=false subscription should not receive them as the subscriptions should // Durable noLocal=false subscription should not receive them as the subscriptions should
// have been removed and recreated to update the noLocal flag. // have been removed and recreated to update the noLocal flag.
{ {
Message message = durableSubscriber.receive(2000); Message message = durableSubscriber.receive(500);
assertNull(message); assertNull(message);
} }
@ -184,7 +159,98 @@ public class JMSDurableSubNoLocalChangedTest {
// Durable subscriber should receive them // Durable subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) { for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(5000); Message message = durableSubscriber.receive(2000);
assertNotNull("Should get local messages now", message);
}
}
@Test(timeout = 60000)
public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception {
connection = createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
// Public first set, only the non durable sub should get these.
publishToTopic(session, topic);
LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
// Standard subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = nonDurableSubscriber.receive(2000);
assertNotNull(message);
}
// Durable noLocal=true subscription should not receive them
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public second set for testing durable sub changed.
publishToTopic(session, topic);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable now goes inactive.
durableSubscriber.close();
assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
}
}));
assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
connection.close();
restartBroker();
connection = createConnection();
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// The previous subscription should be restored as an offline subscription.
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Recreate a Durable Topic Subscription with noLocal set to false.
durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable noLocal=false subscription should not receive them as the subscriptions should
// have been removed and recreated to update the noLocal flag.
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public third set which should get queued for the durable sub with noLocal=false
publishToTopic(session, topic);
// Durable subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(2000);
assertNotNull("Should get local messages now", message); assertNotNull("Should get local messages now", message);
} }
} }
@ -197,4 +263,41 @@ public class JMSDurableSubNoLocalChangedTest {
publisher.close(); publisher.close();
} }
private void startBroker() throws Exception {
createBroker(true);
}
private void restartBroker() throws Exception {
stopBroker();
createBroker(false);
}
private void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
private void createBroker(boolean deleteMessages) throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(deleteMessages);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(false);
brokerService.setKeepDurableSubsActive(false);
brokerService.addConnector("amqp://0.0.0.0:0");
brokerService.start();
connectionUri = new URI("amqp://localhost:" +
brokerService.getTransportConnectorByScheme("amqp").getPublishableConnectURI().getPort());
clientId = name.getMethodName() + "-ClientId";
subscriptionName = name.getMethodName() + "-Subscription";
topicName = name.getMethodName();
}
} }

View File

@ -20,7 +20,7 @@ import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -43,11 +43,13 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractSubscription implements Subscription { public abstract class AbstractSubscription implements Subscription {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
protected Broker broker; protected Broker broker;
protected ConnectionContext context; protected ConnectionContext context;
protected ConsumerInfo info; protected ConsumerInfo info;
protected final DestinationFilter destinationFilter; protected final DestinationFilter destinationFilter;
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>(); protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
private BooleanExpression selectorExpression; private BooleanExpression selectorExpression;
private ObjectName objectName; private ObjectName objectName;
private int cursorMemoryHighWaterMark = 70; private int cursorMemoryHighWaterMark = 70;
@ -180,6 +182,7 @@ public abstract class AbstractSubscription implements Subscription {
public int getPrefetchSize() { public int getPrefetchSize() {
return info.getPrefetchSize(); return info.getPrefetchSize();
} }
public void setPrefetchSize(int newSize) { public void setPrefetchSize(int newSize) {
info.setPrefetchSize(newSize); info.setPrefetchSize(newSize);
} }
@ -210,7 +213,6 @@ public abstract class AbstractSubscription implements Subscription {
if (result) { if (result) {
doAddRecoveredMessage(message); doAddRecoveredMessage(message);
} }
} finally { } finally {
msgContext.clear(); msgContext.clear();
} }
@ -245,7 +247,6 @@ public abstract class AbstractSubscription implements Subscription {
* @param destination * @param destination
*/ */
public void addDestination(Destination destination) { public void addDestination(Destination destination) {
} }
/** /**
@ -253,7 +254,6 @@ public abstract class AbstractSubscription implements Subscription {
* @param destination * @param destination
*/ */
public void removeDestination(Destination destination) { public void removeDestination(Destination destination) {
} }
@Override @Override
@ -289,14 +289,17 @@ public abstract class AbstractSubscription implements Subscription {
this.lastAckTime = value; this.lastAckTime = value;
} }
@Override
public long getConsumedCount(){ public long getConsumedCount(){
return subscriptionStatistics.getConsumedCount().getCount(); return subscriptionStatistics.getConsumedCount().getCount();
} }
@Override
public void incrementConsumedCount(){ public void incrementConsumedCount(){
subscriptionStatistics.getConsumedCount().increment(); subscriptionStatistics.getConsumedCount().increment();
} }
@Override
public void resetConsumedCount(){ public void resetConsumedCount(){
subscriptionStatistics.getConsumedCount().reset(); subscriptionStatistics.getConsumedCount().reset();
} }

View File

@ -307,7 +307,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override @Override
public void setSelector(String selector) throws InvalidSelectorException { public void setSelector(String selector) throws InvalidSelectorException {
if (active.get()) {
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
} else {
super.setSelector(getSelector());
}
} }
@Override @Override
@ -348,7 +352,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
MessageReference node = pending.next(); MessageReference node = pending.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} finally { } finally {
pending.release(); pending.release();
pending.clear(); pending.clear();

View File

@ -212,9 +212,29 @@ public class Topic extends BaseDestination implements Task {
} }
private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) { private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) {
if (hasSelectorChanged(info1, info2)) {
return true;
}
return hasNoLocalChanged(info1, info2);
}
private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) {
// Prior to V11 the broker did not store the noLocal value for durable subs.
if (brokerService.getStoreOpenWireVersion() >= 11) {
if (info1.isNoLocal() ^ info2.isNoLocal()) {
return true;
}
}
return false;
}
private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
if (info1.getSelector() != null ^ info2.getSelector() != null) { if (info1.getSelector() != null ^ info2.getSelector() != null) {
return true; return true;
} }
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
return true; return true;
} }
@ -242,6 +262,10 @@ public class Topic extends BaseDestination implements Task {
// Need to delete the subscription // Need to delete the subscription
topicStore.deleteSubscription(clientId, subscriptionName); topicStore.deleteSubscription(clientId, subscriptionName);
info = null; info = null;
// Force a rebuild of the selector chain for the subscription otherwise
// the stored subscription is updated but the selector expression is not
// and the subscription will not behave according to the new configuration.
subscription.setSelector(subscription.getConsumerInfo().getSelector());
synchronized (consumers) { synchronized (consumers) {
consumers.remove(subscription); consumers.remove(subscription);
} }

View File

@ -0,0 +1,388 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that the durable sub updates when the offline sub is reactivated with new values.
*/
public class DurableSubscriptionUpdatesTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionUpdatesTest.class);
private final int MSG_COUNT = 5;
private BrokerService brokerService;
private URI connectionUri;
private String clientId;
private String subscriptionName;
private String topicName;
private TopicConnection connection;
@Rule public TestName name = new TestName();
protected TopicConnection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseAsyncSend(true);
TopicConnection connection = factory.createTopicConnection();
connection.setClientID(clientId);
connection.start();
return connection;
}
@Before
public void setUp() throws Exception {
startBroker();
}
@After
public void tearDown() throws Exception {
try {
connection.close();
} catch (Exception e) {
}
stopBroker();
}
@Test(timeout = 60000)
public void testSelectorChange() throws Exception {
connection = createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, "JMSPriority > 8", false);
// Public first set, only the non durable sub should get these.
publishToTopic(session, topic, 9);
publishToTopic(session, topic, 8);
// Standard subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(2000);
assertNotNull(message);
assertEquals(9, message.getJMSPriority());
}
// Subscriber should not receive the others.
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public second set for testing durable sub changed.
publishToTopic(session, topic, 9);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable now goes inactive.
durableSubscriber.close();
assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
}
}));
assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
LOG.debug("Testing that updated selector subscription does get any messages.");
// Recreate a Durable Topic Subscription with noLocal set to false.
durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, "JMSPriority > 7", false);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable subscription should not receive them as the subscriptions should
// have been removed and recreated to update the noLocal flag.
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public third set which should get queued for the durable sub with noLocal=false
publishToTopic(session, topic, 8);
// Durable subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(5000);
assertNotNull("Should get messages now", message);
assertEquals(8, message.getJMSPriority());
}
}
@Test(timeout = 60000)
public void testResubscribeWithNewNoLocalValueNoBrokerRestart() throws Exception {
connection = createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
// Public first set, only the non durable sub should get these.
publishToTopic(session, topic);
LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
// Standard subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = nonDurableSubscriber.receive(2000);
assertNotNull(message);
}
// Durable noLocal=true subscription should not receive them
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public second set for testing durable sub changed.
publishToTopic(session, topic);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable now goes inactive.
durableSubscriber.close();
assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
}
}));
assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
// Recreate a Durable Topic Subscription with noLocal set to false.
durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable noLocal=false subscription should not receive them as the subscriptions should
// have been removed and recreated to update the noLocal flag.
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public third set which should get queued for the durable sub with noLocal=false
publishToTopic(session, topic);
// Durable subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(5000);
assertNotNull("Should get local messages now", message);
}
}
@Test(timeout = 60000)
public void testDurableResubscribeWithNewNoLocalValueWithBrokerRestart() throws Exception {
connection = createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, true);
// Create a Durable Topic Subscription with noLocal set to true.
TopicSubscriber nonDurableSubscriber = session.createSubscriber(topic);
// Public first set, only the non durable sub should get these.
publishToTopic(session, topic);
LOG.debug("Testing that noLocal=true subscription doesn't get any messages.");
// Standard subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = nonDurableSubscriber.receive(2000);
assertNotNull(message);
}
// Durable noLocal=true subscription should not receive them
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public second set for testing durable sub changed.
publishToTopic(session, topic);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable now goes inactive.
durableSubscriber.close();
assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
}
}));
assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
LOG.debug("Testing that updated noLocal=false subscription does get any messages.");
connection.close();
restartBroker();
connection = createConnection();
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// The previous subscription should be restored as an offline subscription.
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Recreate a Durable Topic Subscription with noLocal set to false.
durableSubscriber = session.createDurableSubscriber(topic, subscriptionName, null, false);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
// Durable noLocal=false subscription should not receive them as the subscriptions should
// have been removed and recreated to update the noLocal flag.
{
Message message = durableSubscriber.receive(500);
assertNull(message);
}
// Public third set which should get queued for the durable sub with noLocal=false
publishToTopic(session, topic);
// Durable subscriber should receive them
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = durableSubscriber.receive(2000);
assertNotNull("Should get local messages now", message);
}
}
private void publishToTopic(TopicSession session, Topic destination) throws Exception {
publishToTopic(session, destination, Message.DEFAULT_PRIORITY);
}
private void publishToTopic(TopicSession session, Topic destination, int priority) throws Exception {
TopicPublisher publisher = session.createPublisher(destination);
for (int i = 0; i < MSG_COUNT; ++i) {
publisher.send(session.createMessage(), Message.DEFAULT_DELIVERY_MODE, priority, Message.DEFAULT_TIME_TO_LIVE);
}
publisher.close();
}
private void startBroker() throws Exception {
createBroker(true);
}
private void restartBroker() throws Exception {
stopBroker();
createBroker(false);
}
private void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
private void createBroker(boolean deleteMessages) throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(deleteMessages);
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(false);
brokerService.setKeepDurableSubsActive(false);
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
connectionUri = connector.getPublishableConnectURI();
clientId = name.getMethodName() + "-ClientId";
subscriptionName = name.getMethodName() + "-Subscription";
topicName = name.getMethodName();
}
}