diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 6a6b246620..6412af589a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -1111,7 +1111,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); - sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); + if (shared) { + sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); + } else { + sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); + } } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java index ad0d9dd66e..02a5424f2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java @@ -29,7 +29,9 @@ import javax.jms.Topic; import java.util.Arrays; import java.util.Collection; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -138,4 +140,46 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { testSharedDurableConsumer(connection, connection2); } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithSelector() throws JMSException { + SimpleString qName = amqpUseCoreSubscriptionNaming ? new SimpleString("SharedConsumer") : new SimpleString("SharedConsumer:global"); + Connection connection = createConnection(true); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(getTopicName()); + + MessageConsumer consumer = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b"); + QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); + assertEquals(-1, queue.getMaxConsumers()); + consumer.close(); + MessageConsumer consumer2 = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b and b=c"); + queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); + assertEquals(-1, queue.getMaxConsumers()); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testUnSharedDurableConsumerWithSelector() throws JMSException { + SimpleString qName = new SimpleString("foo.SharedConsumer"); + Connection connection = createConnection("foo", true); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(getTopicName()); + + MessageConsumer consumer = session.createDurableConsumer(topic, "SharedConsumer", "a=b", false); + QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); + assertEquals(1, queue.getMaxConsumers()); + consumer.close(); + MessageConsumer consumer2 = session.createDurableConsumer(topic, "SharedConsumer", "a=b and b=c", false); + queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); + assertEquals(1, queue.getMaxConsumers()); + } finally { + connection.close(); + } + } }