ARTEMIS-2848 RA fails w/durable sub w/legacy prefix
This commit is contained in:
parent
e3d44eef1b
commit
52d3ed1c68
|
@ -1796,7 +1796,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
||||||
cf.setUseTopologyForLoadBalancing(raProperties.isUseTopologyForLoadBalancing());
|
cf.setUseTopologyForLoadBalancing(raProperties.isUseTopologyForLoadBalancing());
|
||||||
|
|
||||||
cf.setEnableSharedClientID(true);
|
cf.setEnableSharedClientID(true);
|
||||||
cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
|
cf.setEnable1xPrefixes(overrideProperties.isEnable1xPrefixes() != null ? overrideProperties.isEnable1xPrefixes() : raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
|
||||||
setParams(cf, overrideProperties);
|
setParams(cf, overrideProperties);
|
||||||
return cf;
|
return cf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||||
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
|
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
|
||||||
|
@ -140,7 +141,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||||
|
|
||||||
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
|
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
|
||||||
|
|
||||||
SimpleString oldTopicName = subResponse.getAddress();
|
SimpleString oldTopicName = (enable1XPrefix ? PacketImpl.OLD_TOPIC_PREFIX : SimpleString.toSimpleString("")).concat(subResponse.getAddress());
|
||||||
|
|
||||||
boolean topicChanged = !oldTopicName.equals(activation.getAddress());
|
boolean topicChanged = !oldTopicName.equals(activation.getAddress());
|
||||||
|
|
||||||
|
|
|
@ -85,6 +85,59 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase {
|
||||||
qResourceAdapter.stop();
|
qResourceAdapter.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDurableTopicSubscriptionWith1xPrefixesOnSpec() throws Exception {
|
||||||
|
internalTestDurableTopicSubscriptionWith1xPrefixes(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDurableTopicSubscriptionWith1xPrefixesOnRA() throws Exception {
|
||||||
|
internalTestDurableTopicSubscriptionWith1xPrefixes(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void internalTestDurableTopicSubscriptionWith1xPrefixes(boolean ra) throws Exception {
|
||||||
|
server.getRemotingService().createAcceptor("test", "tcp://localhost:61617?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.").start();
|
||||||
|
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
|
||||||
|
if (ra) {
|
||||||
|
qResourceAdapter.setEnable1xPrefixes(true);
|
||||||
|
}
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
qResourceAdapter.start(ctx);
|
||||||
|
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
|
||||||
|
spec.setSetupAttempts(1);
|
||||||
|
spec.setSetupInterval(500L);
|
||||||
|
spec.setResourceAdapter(qResourceAdapter);
|
||||||
|
spec.setUseJNDI(false);
|
||||||
|
spec.setDestinationType("javax.jms.Topic");
|
||||||
|
spec.setDestination("jms.topic.MyTopic");
|
||||||
|
if (!ra) {
|
||||||
|
spec.setEnable1xPrefixes(true);
|
||||||
|
}
|
||||||
|
spec.setSubscriptionDurability("Durable");
|
||||||
|
spec.setClientId("myClientId");
|
||||||
|
spec.setSubscriptionName("mySubscriptionName");
|
||||||
|
qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
|
||||||
|
qResourceAdapter.setConnectionParameters("host=localhost;port=61617");
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
|
||||||
|
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
||||||
|
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
||||||
|
ClientSession session = locator.createSessionFactory().createSession();
|
||||||
|
ClientProducer clientProducer = session.createProducer("MyTopic");
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.getBodyBuffer().writeString("teststring");
|
||||||
|
clientProducer.send(message);
|
||||||
|
session.close();
|
||||||
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertNotNull(endpoint.lastMessage);
|
||||||
|
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
|
||||||
|
|
||||||
|
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||||
|
|
||||||
|
qResourceAdapter.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testObjectMessageReceiveSerializationControl() throws Exception {
|
public void testObjectMessageReceiveSerializationControl() throws Exception {
|
||||||
String blackList = "org.apache.activemq.artemis.tests.integration.ra";
|
String blackList = "org.apache.activemq.artemis.tests.integration.ra";
|
||||||
|
|
Loading…
Reference in New Issue