mirror of https://github.com/apache/activemq.git
When a nolocal durable consumer reconnects the new connectionId is properly captured for
the NoLocal expression so that nolocal works on reconnect. Also fixed
the detection of the nolocal value changing on consumer connect.
(cherry picked from commit 7c293b661f
)
This commit is contained in:
parent
48036bff05
commit
d3b86e77dd
|
@ -162,6 +162,12 @@ public class TopicRegion extends AbstractRegion {
|
||||||
sub.context = context;
|
sub.context = context;
|
||||||
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
|
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
|
||||||
}
|
}
|
||||||
|
//If NoLocal we need to update the NoLocal selector with the new connectionId
|
||||||
|
//Simply setting the selector with the current one will trigger a
|
||||||
|
//refresh of of the connectionId for the NoLocal expression
|
||||||
|
if (info.isNoLocal()) {
|
||||||
|
sub.setSelector(sub.getSelector());
|
||||||
|
}
|
||||||
subscriptions.put(info.getConsumerId(), sub);
|
subscriptions.put(info.getConsumerId(), sub);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -189,8 +195,9 @@ public class TopicRegion extends AbstractRegion {
|
||||||
// deactivate only if given context is same
|
// deactivate only if given context is same
|
||||||
// as what is in the sub. otherwise, during linksteal
|
// as what is in the sub. otherwise, during linksteal
|
||||||
// sub will get new context, but will be removed here
|
// sub will get new context, but will be removed here
|
||||||
if (sub.getContext() == context)
|
if (sub.getContext() == context) {
|
||||||
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
|
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
super.removeConsumer(context, info);
|
super.removeConsumer(context, info);
|
||||||
|
@ -373,6 +380,12 @@ public class TopicRegion extends AbstractRegion {
|
||||||
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
|
if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
// Prior to V11 the broker did not store the noLocal value for durable subs.
|
||||||
|
if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) {
|
||||||
|
if (info1.isNoLocal() ^ info2.isNoLocal()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
return !info1.getDestination().equals(info2.getDestination());
|
return !info1.getDestination().equals(info2.getDestination());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,18 +20,29 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
import javax.jms.TopicConnection;
|
import javax.jms.TopicConnection;
|
||||||
import javax.jms.TopicPublisher;
|
import javax.jms.TopicPublisher;
|
||||||
import javax.jms.TopicSession;
|
import javax.jms.TopicSession;
|
||||||
import javax.jms.TopicSubscriber;
|
import javax.jms.TopicSubscriber;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.CommandTypes;
|
import org.apache.activemq.command.CommandTypes;
|
||||||
|
import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBStore;
|
import org.apache.activemq.store.kahadb.KahaDBStore;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -39,12 +50,16 @@ import org.junit.Ignore;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for spec compliance for durable subscriptions that change the noLocal flag.
|
* Test for spec compliance for durable subscriptions that change the noLocal flag.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class DurableSubscriptionWithNoLocalTest {
|
public class DurableSubscriptionWithNoLocalTest {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
|
||||||
|
@ -57,6 +72,19 @@ public class DurableSubscriptionWithNoLocalTest {
|
||||||
private BrokerService brokerService;
|
private BrokerService brokerService;
|
||||||
private String connectionUri;
|
private String connectionUri;
|
||||||
private ActiveMQConnectionFactory factory;
|
private ActiveMQConnectionFactory factory;
|
||||||
|
private final boolean keepDurableSubsActive;
|
||||||
|
|
||||||
|
@Parameters(name="keepDurableSubsActive={0}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{true},
|
||||||
|
{false}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public DurableSubscriptionWithNoLocalTest(final boolean keepDurableSubsActive) {
|
||||||
|
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
@ -69,7 +97,115 @@ public class DurableSubscriptionWithNoLocalTest {
|
||||||
brokerService.waitUntilStopped();
|
brokerService.waitUntilStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
|
/**
|
||||||
|
* Make sure that NoLocal works for connection started/stopped
|
||||||
|
*
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
|
||||||
|
ActiveMQConnection connection = null;
|
||||||
|
try {
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connection.setClientID("test-client");
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 1");
|
||||||
|
connection.stop();
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 2");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that NoLocal works for multiple connections to the same subscription
|
||||||
|
*
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNoLocalStillWorksNewConnection() throws Exception {
|
||||||
|
ActiveMQConnection connection = null;
|
||||||
|
try {
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connection.setClientID("test-client");
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 1");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connection.setClientID("test-client");
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 2");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that NoLocal works after restart
|
||||||
|
*
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNoLocalStillWorksRestartBroker() throws Exception {
|
||||||
|
ActiveMQConnection connection = null;
|
||||||
|
try {
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connection.setClientID("test-client");
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 1");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tearDown();
|
||||||
|
createBroker(false);
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connection.setClientID("test-client");
|
||||||
|
connection.start();
|
||||||
|
test(connection, "test message 2");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void test(final ActiveMQConnection connection, final String body) throws Exception {
|
||||||
|
|
||||||
|
Session incomingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = incomingMessagesSession.createTopic("test.topic");
|
||||||
|
TopicSubscriber consumer = incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", null, true);
|
||||||
|
|
||||||
|
Session outgoingMessagesSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = outgoingMessagesSession.createTopic("test.topic");
|
||||||
|
MessageProducer producer = outgoingMessagesSession.createProducer(destination);
|
||||||
|
TextMessage textMessage = outgoingMessagesSession.createTextMessage(body);
|
||||||
|
producer.send(textMessage);
|
||||||
|
producer.close();
|
||||||
|
System.out.println("message sent: " + textMessage.getJMSMessageID() + "; body: " + textMessage.getText());
|
||||||
|
outgoingMessagesSession.close();
|
||||||
|
|
||||||
|
assertNull(consumer.receive(2000));
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
incomingMessagesSession.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testDurableSubWithNoLocalChange() throws Exception {
|
public void testDurableSubWithNoLocalChange() throws Exception {
|
||||||
TopicConnection connection = factory.createTopicConnection();
|
TopicConnection connection = factory.createTopicConnection();
|
||||||
|
@ -126,7 +262,6 @@ public class DurableSubscriptionWithNoLocalTest {
|
||||||
assertNull(durableSub.receive(100));
|
assertNull(durableSub.receive(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testInvertedDurableSubWithNoLocalChange() throws Exception {
|
public void testInvertedDurableSubWithNoLocalChange() throws Exception {
|
||||||
TopicConnection connection = factory.createTopicConnection();
|
TopicConnection connection = factory.createTopicConnection();
|
||||||
|
@ -247,7 +382,6 @@ public class DurableSubscriptionWithNoLocalTest {
|
||||||
assertNull(durableSub.receive(100));
|
assertNull(durableSub.receive(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Requires Broker be able to remove and recreate on noLocal change")
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
|
public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws Exception {
|
||||||
TopicConnection connection = factory.createTopicConnection();
|
TopicConnection connection = factory.createTopicConnection();
|
||||||
|
@ -322,6 +456,7 @@ public class DurableSubscriptionWithNoLocalTest {
|
||||||
brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
|
brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
|
||||||
brokerService.setUseJmx(false);
|
brokerService.setUseJmx(false);
|
||||||
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||||
|
brokerService.setKeepDurableSubsActive(keepDurableSubsActive);
|
||||||
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
|
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
|
||||||
|
|
||||||
brokerService.start();
|
brokerService.start();
|
||||||
|
|
Loading…
Reference in New Issue