diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index ba6127bae5..4d19cfde41 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -1796,7 +1796,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { cf.setUseTopologyForLoadBalancing(raProperties.isUseTopologyForLoadBalancing()); cf.setEnableSharedClientID(true); - cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes()); + cf.setEnable1xPrefixes(overrideProperties.isEnable1xPrefixes() != null ? overrideProperties.isEnable1xPrefixes() : raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes()); setParams(cf, overrideProperties); return cf; } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 1cb6bb431e..68bfe14fbf 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions; @@ -140,7 +141,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector)); - SimpleString oldTopicName = subResponse.getAddress(); + SimpleString oldTopicName = (enable1XPrefix ? PacketImpl.OLD_TOPIC_PREFIX : SimpleString.toSimpleString("")).concat(subResponse.getAddress()); boolean topicChanged = !oldTopicName.equals(activation.getAddress()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerTest.java index d372054956..776187ce72 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerTest.java @@ -85,6 +85,59 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase { qResourceAdapter.stop(); } + @Test + public void testDurableTopicSubscriptionWith1xPrefixesOnSpec() throws Exception { + internalTestDurableTopicSubscriptionWith1xPrefixes(false); + } + + @Test + public void testDurableTopicSubscriptionWith1xPrefixesOnRA() throws Exception { + internalTestDurableTopicSubscriptionWith1xPrefixes(true); + } + + public void internalTestDurableTopicSubscriptionWith1xPrefixes(boolean ra) throws Exception { + server.getRemotingService().createAcceptor("test", "tcp://localhost:61617?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.").start(); + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + if (ra) { + qResourceAdapter.setEnable1xPrefixes(true); + } + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setSetupAttempts(1); + spec.setSetupInterval(500L); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Topic"); + spec.setDestination("jms.topic.MyTopic"); + if (!ra) { + spec.setEnable1xPrefixes(true); + } + spec.setSubscriptionDurability("Durable"); + spec.setClientId("myClientId"); + spec.setSubscriptionName("mySubscriptionName"); + qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY); + qResourceAdapter.setConnectionParameters("host=localhost;port=61617"); + CountDownLatch latch = new CountDownLatch(1); + DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); + qResourceAdapter.endpointActivation(endpointFactory, spec); + ClientSession session = locator.createSessionFactory().createSession(); + ClientProducer clientProducer = session.createProducer("MyTopic"); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString("teststring"); + clientProducer.send(message); + session.close(); + latch.await(5, TimeUnit.SECONDS); + + assertNotNull(endpoint.lastMessage); + assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring"); + + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + + qResourceAdapter.stop(); + } + @Test public void testObjectMessageReceiveSerializationControl() throws Exception { String blackList = "org.apache.activemq.artemis.tests.integration.ra";