When a nolocal durable consumer reconnects the new connectionId is properly captured for
the NoLocal expression so that nolocal works on reconnect.  Also fixed
the detection of the nolocal value changing on consumer connect.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-09-21 09:32:37 -04:00
parent 6c01b641b1
commit 7c293b661f
2 changed files with 152 additions and 4 deletions

View File

@ -162,6 +162,12 @@ public class TopicRegion extends AbstractRegion {
sub.context = context;
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
}
//If NoLocal we need to update the NoLocal selector with the new connectionId
//Simply setting the selector with the current one will trigger a
//refresh of of the connectionId for the NoLocal expression
if (info.isNoLocal()) {
sub.setSelector(sub.getSelector());
}
subscriptions.put(info.getConsumerId(), sub);
}
} else {
@ -189,8 +195,9 @@ public class TopicRegion extends AbstractRegion {
// deactivate only if given context is same
// as what is in the sub. otherwise, during linksteal
// sub will get new context, but will be removed here
if (sub.getContext() == context)
if (sub.getContext() == context) {
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
}
}
} else {
super.removeConsumer(context, info);
@ -373,6 +380,12 @@ public class TopicRegion extends AbstractRegion {
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
return true;
}
// Prior to V11 the broker did not store the noLocal value for durable subs.
if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) {
if (info1.isNoLocal() ^ info2.isNoLocal()) {
return true;
}
}
return !info1.getDestination().equals(info2.getDestination());
}

View File

@ -20,18 +20,29 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Before;
@ -39,12 +50,16 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for spec compliance for durable subscriptions that change the noLocal flag.
*/
@RunWith(Parameterized.class)
public class DurableSubscriptionWithNoLocalTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
@ -57,6 +72,19 @@ public class DurableSubscriptionWithNoLocalTest {
private BrokerService brokerService;
private String connectionUri;
private ActiveMQConnectionFactory factory;
private final boolean keepDurableSubsActive;
@Parameters(name="keepDurableSubsActive={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true},
{false}
});
}
public DurableSubscriptionWithNoLocalTest(final boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
@Before
public void setUp() throws Exception {
@ -69,7 +97,115 @@ public class DurableSubscriptionWithNoLocalTest {
brokerService.waitUntilStopped();
}
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
/**
* Make sure that NoLocal works for connection started/stopped
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
connection.stop();
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* Make sure that NoLocal works for multiple connections to the same subscription
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorksNewConnection() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
} finally {
if (connection != null) {
connection.close();
}
}
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* Make sure that NoLocal works after restart
*
* @throws JMSException
*/
@Test(timeout = 60000)
public void testNoLocalStillWorksRestartBroker() throws Exception {
ActiveMQConnection connection = null;
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 1");
} finally {
if (connection != null) {
connection.close();
}
}
tearDown();
createBroker(false);
try {
connection = (ActiveMQConnection) factory.createConnection();
connection.setClientID("test-client");
connection.start();
test(connection, "test message 2");
} finally {
if (connection != null) {
connection.close();
}
}
}
void test(final ActiveMQConnection connection, final String body) throws Exception {
Session incomingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = incomingMessagesSession.createTopic("test.topic");
TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", null, true);
Session outgoingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = outgoingMessagesSession.createTopic("test.topic");
MessageProducer producer = outgoingMessagesSession.createProducer(destination);
TextMessage textMessage = outgoingMessagesSession.createTextMessage(body);
producer.send(textMessage);
producer.close();
System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: " + textMessage.getText());
outgoingMessagesSession.close();
assertNull(consumer.receive(2000));
consumer.close();
incomingMessagesSession.close();
}
@Test(timeout = 60000)
public void testDurableSubWithNoLocalChange() throws Exception {
TopicConnection connection = factory.createTopicConnection();
@ -126,7 +262,6 @@ public class DurableSubscriptionWithNoLocalTest {
assertNull(durableSub.receive(100));
}
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
@Test(timeout = 60000)
public void testInvertedDurableSubWithNoLocalChange() throws Exception {
TopicConnection connection = factory.createTopicConnection();
@ -247,7 +382,6 @@ public class DurableSubscriptionWithNoLocalTest {
assertNull(durableSub.receive(100));
}
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
@Test(timeout = 60000)
public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
TopicConnection connection = factory.createTopicConnection();
@ -322,6 +456,7 @@ public class DurableSubscriptionWithNoLocalTest {
brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
brokerService.setUseJmx(false);
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setKeepDurableSubsActive(keepDurableSubsActive);
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();