This commit is contained in:
Clebert Suconic 2021-08-18 12:41:19 -04:00
commit 995ee5688b
2 changed files with 49 additions and 1 deletions

View File

@ -1111,7 +1111,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
if (shared) {
sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else {
sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
}
} else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}

View File

@ -29,7 +29,9 @@ import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -138,4 +140,46 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithSelector() throws JMSException {
SimpleString qName = amqpUseCoreSubscriptionNaming ? new SimpleString("SharedConsumer") : new SimpleString("SharedConsumer:global");
Connection connection = createConnection(true);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b");
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(-1, queue.getMaxConsumers());
consumer.close();
MessageConsumer consumer2 = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b and b=c");
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(-1, queue.getMaxConsumers());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testUnSharedDurableConsumerWithSelector() throws JMSException {
SimpleString qName = new SimpleString("foo.SharedConsumer");
Connection connection = createConnection("foo", true);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createDurableConsumer(topic, "SharedConsumer", "a=b", false);
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers());
consumer.close();
MessageConsumer consumer2 = session.createDurableConsumer(topic, "SharedConsumer", "a=b and b=c", false);
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers());
} finally {
connection.close();
}
}
}