diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 51c9beb5f6..eca344919b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -162,6 +162,12 @@ public class TopicRegion extends AbstractRegion { sub.context = context; sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); } + //If NoLocal we need to update the NoLocal selector with the new connectionId + //Simply setting the selector with the current one will trigger a + //refresh of of the connectionId for the NoLocal expression + if (info.isNoLocal()) { + sub.setSelector(sub.getSelector()); + } subscriptions.put(info.getConsumerId(), sub); } } else { @@ -189,8 +195,9 @@ public class TopicRegion extends AbstractRegion { // deactivate only if given context is same // as what is in the sub. otherwise, during linksteal // sub will get new context, but will be removed here - if (sub.getContext() == context) + if (sub.getContext() == context) { sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); + } } } else { super.removeConsumer(context, info); @@ -373,6 +380,12 @@ public class TopicRegion extends AbstractRegion { if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { return true; } + // Prior to V11 the broker did not store the noLocal value for durable subs. + if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) { + if (info1.isNoLocal() ^ info2.isNoLocal()) { + return true; + } + } return !info1.getDestination().equals(info2.getDestination()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java index 4ecf811954..ecbfac1128 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java @@ -20,18 +20,29 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW; import org.apache.activemq.store.kahadb.KahaDBStore; import org.junit.After; import org.junit.Before; @@ -39,12 +50,16 @@ import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test for spec compliance for durable subscriptions that change the noLocal flag. */ +@RunWith(Parameterized.class) public class DurableSubscriptionWithNoLocalTest { private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class); @@ -57,6 +72,19 @@ public class DurableSubscriptionWithNoLocalTest { private BrokerService brokerService; private String connectionUri; private ActiveMQConnectionFactory factory; + private final boolean keepDurableSubsActive; + + @Parameters(name="keepDurableSubsActive={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, + {false} + }); + } + + public DurableSubscriptionWithNoLocalTest(final boolean keepDurableSubsActive) { + this.keepDurableSubsActive = keepDurableSubsActive; + } @Before public void setUp() throws Exception { @@ -69,7 +97,115 @@ public class DurableSubscriptionWithNoLocalTest { brokerService.waitUntilStopped(); } - @Ignore("Requires Broker be able to remove and recreate on noLocal change") + /** + * Make sure that NoLocal works for connection started/stopped + * + * @throws JMSException + */ + @Test(timeout = 60000) + public void testNoLocalStillWorkWithConnectionRestart() throws Exception { + ActiveMQConnection connection = null; + try { + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-client"); + connection.start(); + test(connection, "test message 1"); + connection.stop(); + connection.start(); + test(connection, "test message 2"); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /** + * Make sure that NoLocal works for multiple connections to the same subscription + * + * @throws JMSException + */ + @Test(timeout = 60000) + public void testNoLocalStillWorksNewConnection() throws Exception { + ActiveMQConnection connection = null; + try { + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-client"); + connection.start(); + test(connection, "test message 1"); + } finally { + if (connection != null) { + connection.close(); + } + } + + try { + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-client"); + connection.start(); + test(connection, "test message 2"); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /** + * Make sure that NoLocal works after restart + * + * @throws JMSException + */ + @Test(timeout = 60000) + public void testNoLocalStillWorksRestartBroker() throws Exception { + ActiveMQConnection connection = null; + try { + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-client"); + connection.start(); + test(connection, "test message 1"); + } finally { + if (connection != null) { + connection.close(); + } + } + + tearDown(); + createBroker(false); + + try { + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-client"); + connection.start(); + test(connection, "test message 2"); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + void test(final ActiveMQConnection connection, final String body) throws Exception { + + Session incomingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = incomingMessagesSession.createTopic("test.topic"); + TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", null, true); + + Session outgoingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = outgoingMessagesSession.createTopic("test.topic"); + MessageProducer producer = outgoingMessagesSession.createProducer(destination); + TextMessage textMessage = outgoingMessagesSession.createTextMessage(body); + producer.send(textMessage); + producer.close(); + System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: " + textMessage.getText()); + outgoingMessagesSession.close(); + + assertNull(consumer.receive(2000)); + + consumer.close(); + incomingMessagesSession.close(); + } + @Test(timeout = 60000) public void testDurableSubWithNoLocalChange() throws Exception { TopicConnection connection = factory.createTopicConnection(); @@ -126,7 +262,6 @@ public class DurableSubscriptionWithNoLocalTest { assertNull(durableSub.receive(100)); } - @Ignore("Requires Broker be able to remove and recreate on noLocal change") @Test(timeout = 60000) public void testInvertedDurableSubWithNoLocalChange() throws Exception { TopicConnection connection = factory.createTopicConnection(); @@ -247,7 +382,6 @@ public class DurableSubscriptionWithNoLocalTest { assertNull(durableSub.receive(100)); } - @Ignore("Requires Broker be able to remove and recreate on noLocal change") @Test(timeout = 60000) public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception { TopicConnection connection = factory.createTopicConnection(); @@ -322,6 +456,7 @@ public class DurableSubscriptionWithNoLocalTest { brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION); brokerService.setUseJmx(false); brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + brokerService.setKeepDurableSubsActive(keepDurableSubsActive); TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0"); brokerService.start();