diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index d2c0f431b2..ad7e702dc8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -177,6 +177,8 @@ public interface ClientSession extends XAResource, AutoCloseable { Long getRingSize(); Boolean isEnabled(); + + Boolean isConfigurationManaged(); } // Lifecycle operations ------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index d532dc01cf..5529448a4c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -78,6 +78,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { private final Boolean enabled; + private final Boolean configurationManaged; + private final Integer defaultConsumerWindowSize; @@ -164,7 +166,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final Long autoDeleteDelay, final Long autoDeleteMessageCount, final Integer defaultConsumerWindowSize) { - this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null); + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null, null); } public QueueQueryImpl(final boolean durable, @@ -195,7 +197,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final Long autoDeleteMessageCount, final Integer defaultConsumerWindowSize, final Long ringSize, - final Boolean enabled) { + final Boolean enabled, + final Boolean configurationManaged) { this.durable = durable; this.temporary = temporary; this.consumerCount = consumerCount; @@ -225,6 +228,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.defaultConsumerWindowSize = defaultConsumerWindowSize; this.ringSize = ringSize; this.enabled = enabled; + this.configurationManaged = configurationManaged; } @Override @@ -371,5 +375,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { public Boolean isEnabled() { return enabled; } + + @Override + public Boolean isConfigurationManaged() { + return configurationManaged; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java index bf8322217f..9deffe1b2e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java @@ -66,12 +66,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon private Boolean enabled; + private Boolean configurationManaged; + public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled()); + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled(), result.isConfigurationManaged()); } public SessionQueueQueryResponseMessage_V3() { - this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private SessionQueueQueryResponseMessage_V3(final SimpleString name, @@ -102,7 +104,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon final Long autoDeleteMessageCount, final Integer defaultConsumerWindowSize, final Long ringSize, - final Boolean enabled) { + final Boolean enabled, + final Boolean configurationManaged) { super(SESS_QUEUEQUERY_RESP_V3); this.durable = durable; @@ -162,6 +165,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.ringSize = ringSize; this.enabled = enabled; + + this.configurationManaged = configurationManaged; } public boolean isAutoCreated() { @@ -312,6 +317,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.enabled = enabled; } + public Boolean isConfigurationManaged() { + return configurationManaged; + } + + public void setConfigurationManaged(Boolean configurationManaged) { + this.configurationManaged = configurationManaged; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); @@ -335,6 +348,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon BufferHelper.writeNullableLong(buffer, ringSize); BufferHelper.writeNullableBoolean(buffer, enabled); BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch); + BufferHelper.writeNullableBoolean(buffer, configurationManaged); } @@ -375,6 +389,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon if (buffer.readableBytes() > 0) { groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + configurationManaged = BufferHelper.readNullableBoolean(buffer); + } } @Override @@ -401,6 +418,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode()); result = prime * result + (ringSize == null ? 0 : ringSize.hashCode()); result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237); + result = prime * result + (configurationManaged == null ? 0 : configurationManaged ? 1231 : 1237); return result; } @@ -434,12 +452,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize); buff.append(", ringSize=" + ringSize); buff.append(", enabled=" + enabled); + buff.append(", configurationManaged=" + configurationManaged); return buff.toString(); } @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled(), isConfigurationManaged()); } @Override @@ -542,6 +561,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon return false; if (maxConsumers != other.maxConsumers) return false; + if (configurationManaged == null) { + if (other.configurationManaged != null) + return false; + } else if (!configurationManaged.equals(other.configurationManaged)) + return false; return true; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index 798f4175d6..9fb07cadde 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -79,6 +79,8 @@ public class QueueQueryResult { private Boolean enabled; + private Boolean configurationManaged; + public QueueQueryResult(final SimpleString name, final SimpleString address, final boolean durable, @@ -107,7 +109,8 @@ public class QueueQueryResult { final Long autoDeleteMessageCount, final Integer defaultConsumerWindowSize, final Long ringSize, - final Boolean enabled) { + final Boolean enabled, + final Boolean configurationManaged) { this.durable = durable; this.temporary = temporary; @@ -165,6 +168,8 @@ public class QueueQueryResult { this.ringSize = ringSize; this.enabled = enabled; + + this.configurationManaged = configurationManaged; } public boolean isExists() { @@ -286,4 +291,8 @@ public class QueueQueryResult { public Boolean isEnabled() { return enabled; } + + public Boolean isConfigurationManaged() { + return configurationManaged; + } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 3fd2bb4732..2076478267 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -912,7 +912,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress()); - if (selectorChanged || topicChanged) { + if ((selectorChanged || topicChanged) && !subResponse.isConfigurationManaged()) { // Delete the old durable sub session.deleteQueue(queueName); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 0e138d8072..c65d86d0e0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -1098,9 +1098,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); if (result.isExists()) { - // If a client reattaches to a durable subscription with a different no-local - // filter value, selector or address then we must recreate the queue (JMS semantics). - if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + /* + * If a client reattaches to a durable subscription with a different filter or address then we must + * recreate the queue (JMS semantics). However, if the corresponding queue is managed via the + * configuration then we don't want to change it + */ + if (!result.isConfigurationManaged() && (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))) { if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index c06227e99a..d609051204 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -199,7 +199,7 @@ public class AMQConsumer { boolean topicChanged = !oldTopicName.equals(address); - if (selectorChanged || topicChanged) { + if ((selectorChanged || topicChanged) && !result.isConfigurationManaged()) { // Delete the old durable sub session.getCoreSession().deleteQueue(queueName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index c73e63e9c2..ba7fe1943d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1060,12 +1060,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString filterString = filter == null ? null : filter.getFilterString(); - response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled()); + response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled(), queue.isConfigurationManaged()); } else if (realName.equals(managementAddress)) { // make an exception for the management address (see HORNETQ-29) - response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null); + response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null); } else { - response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled); + response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled, false); } return response; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index fbed4ec8b7..b3d1393928 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -205,6 +205,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); } + protected Connection createCoreConnection(boolean start) throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start); + } + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); @@ -257,6 +261,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); } + protected Connection createOpenWireConnection(boolean start) throws JMSException { + return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, false); + } + private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index b5b170dac6..fab42318c3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -40,9 +40,12 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.DestinationUtil; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.junit.Assert; @@ -217,6 +220,47 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { } } + @Test(timeout = 60000) + public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception { + testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false)); + + } + + @Test(timeout = 60000) + public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception { + testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false)); + + } + + @Test(timeout = 60000) + public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception { + testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false)); + } + + private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception { + final String clientId = "bar"; + final String subName = "foo"; + final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString(); + server.stop(); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST)); + server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true); + server.start(); + + try (Connection connection = connectionSupplier.createConnection()) { + connection.setClientID(clientId); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic("myTopic"); + + MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName); + messageConsumer.close(); + + Queue queue = server.locateQueue(queueName); + assertNotNull(queue); + assertNotNull(queue.getFilter()); + assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString()); + } + } + @Test(timeout = 30000) public void testSelectorsWithJMSTypeOnTopic() throws Exception { doTestSelectorsWithJMSType(true);