ARTEMIS-2896 RA can double legacy prefix

This commit is contained in:
Justin Bertram 2020-09-08 11:31:48 -05:00 committed by Clebert Suconic
parent 7951e1688a
commit 676bb101d8
2 changed files with 27 additions and 7 deletions

View File

@ -141,7 +141,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector)); boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
SimpleString oldTopicName = (enable1XPrefix ? PacketImpl.OLD_TOPIC_PREFIX : SimpleString.toSimpleString("")).concat(subResponse.getAddress()); SimpleString oldTopicName = (enable1XPrefix && !subResponse.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX) ? PacketImpl.OLD_TOPIC_PREFIX : SimpleString.toSimpleString("")).concat(subResponse.getAddress());
boolean topicChanged = !oldTopicName.equals(activation.getAddress()); boolean topicChanged = !oldTopicName.equals(activation.getAddress());

View File

@ -87,16 +87,28 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase {
@Test @Test
public void testDurableTopicSubscriptionWith1xPrefixesOnSpec() throws Exception { public void testDurableTopicSubscriptionWith1xPrefixesOnSpec() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(false); internalTestDurableTopicSubscriptionWith1xPrefixes(false, true);
} }
@Test @Test
public void testDurableTopicSubscriptionWith1xPrefixesOnRA() throws Exception { public void testDurableTopicSubscriptionWith1xPrefixesOnRA() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(true); internalTestDurableTopicSubscriptionWith1xPrefixes(true, true);
} }
public void internalTestDurableTopicSubscriptionWith1xPrefixes(boolean ra) throws Exception { @Test
public void testDurableTopicSubscriptionWith1xPrefixesOnSpecWithoutBrokerPrefixes() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(false, false);
}
@Test
public void testDurableTopicSubscriptionWith1xPrefixesOnRAWithoutBrokerPrefixes() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(true, false);
}
public void internalTestDurableTopicSubscriptionWith1xPrefixes(boolean ra, boolean definePrefixesOnBroker) throws Exception {
if (definePrefixesOnBroker) {
server.getRemotingService().createAcceptor("test", "tcp://localhost:61617?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.").start(); server.getRemotingService().createAcceptor("test", "tcp://localhost:61617?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.").start();
}
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
if (ra) { if (ra) {
qResourceAdapter.setEnable1xPrefixes(true); qResourceAdapter.setEnable1xPrefixes(true);
@ -117,13 +129,21 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase {
spec.setClientId("myClientId"); spec.setClientId("myClientId");
spec.setSubscriptionName("mySubscriptionName"); spec.setSubscriptionName("mySubscriptionName");
qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY); qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
qResourceAdapter.setConnectionParameters("host=localhost;port=61617"); String port = "61616";
if (definePrefixesOnBroker) {
port = "61617";
}
qResourceAdapter.setConnectionParameters("host=localhost;port=" + port);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec); qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession(); ClientSession session = locator.createSessionFactory().createSession();
ClientProducer clientProducer = session.createProducer("MyTopic"); String topic = "MyTopic";
if (!definePrefixesOnBroker) {
topic = "jms.topic." + topic;
}
ClientProducer clientProducer = session.createProducer(topic);
ClientMessage message = session.createMessage(true); ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring"); message.getBodyBuffer().writeString("teststring");
clientProducer.send(message); clientProducer.send(message);