ARTEMIS-3423 - create correct queue when durable subs recreated via AMQP

https://issues.apache.org/jira/browse/ARTEMIS-3423
This commit is contained in:
Andy Taylor 2021-08-18 12:47:28 +01:00 committed by Clebert Suconic
parent 82f0ece67c
commit 191cb34c54
2 changed files with 49 additions and 1 deletions

View File

@ -1111,7 +1111,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.getConsumerCount() == 0) { if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue); sessionSPI.deleteQueue(queue);
sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); if (shared) {
sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else {
sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
}
} else { } else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
} }

View File

@ -29,7 +29,9 @@ import javax.jms.Topic;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -138,4 +140,46 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
testSharedDurableConsumer(connection, connection2); testSharedDurableConsumer(connection, connection2);
} }
@Test(timeout = 30000)
public void testSharedDurableConsumerWithSelector() throws JMSException {
SimpleString qName = amqpUseCoreSubscriptionNaming ? new SimpleString("SharedConsumer") : new SimpleString("SharedConsumer:global");
Connection connection = createConnection(true);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b");
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(-1, queue.getMaxConsumers());
consumer.close();
MessageConsumer consumer2 = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b and b=c");
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(-1, queue.getMaxConsumers());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testUnSharedDurableConsumerWithSelector() throws JMSException {
SimpleString qName = new SimpleString("foo.SharedConsumer");
Connection connection = createConnection("foo", true);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createDurableConsumer(topic, "SharedConsumer", "a=b", false);
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers());
consumer.close();
MessageConsumer consumer2 = session.createDurableConsumer(topic, "SharedConsumer", "a=b and b=c", false);
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers());
} finally {
connection.close();
}
}
} }