From b11945e0c7376db01f25aaf1b61934bda95cb80e Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 30 Aug 2023 17:52:33 +0100 Subject: [PATCH] ARTEMIS-4410 - process deliveries before removing consumer on session close, ensure strict order for a single consumer --- .../core/server/impl/ServerConsumerImpl.java | 5 +- .../integration/client/JMSOrderTest.java | 19 ++++- .../PrefetchRedeliveryCountOpenwireTest.java | 75 ++++++++++++++++++- 3 files changed, 92 insertions(+), 7 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b720baf64c..fa389b0912 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -573,8 +573,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { del.finish(); } - removeItself(); - List refs = cancelRefs(failed, false, null); Transaction tx = new TransactionImpl(storageManager); @@ -587,6 +585,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { tx.rollback(); + // started is false, leaving remove till after cancel ensures order for a single exclusive consumer + removeItself(); + addLingerRefs(); if (!browseOnly) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java index 333bcda901..5e642d2562 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java @@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.tests.util.Wait; @@ -50,11 +53,12 @@ import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFact public class JMSOrderTest extends JMSTestBase { String protocol; - + boolean exclusive; ConnectionFactory protocolCF; - public JMSOrderTest(String protocol) { + public JMSOrderTest(String protocol, boolean exclusive) { this.protocol = protocol; + this.exclusive = exclusive; } @Before @@ -62,9 +66,16 @@ public class JMSOrderTest extends JMSTestBase { protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616"); } - @Parameterized.Parameters(name = "protocol={0}") + @Parameterized.Parameters(name = "protocol={0}&exclusive={1}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}}); + return Arrays.asList(new Object[][]{{"AMQP", true}, {"AMQP", false}, {"OPENWIRE", true}, {"OPENWIRE", false}, {"CORE", true}, {"CORE", false}}); + } + + @Override + protected void extraServerConfig(ActiveMQServer server) { + if (exclusive) { + server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true)); + } } protected void sendToAmqQueue(int count) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java index 5fef3c4a9c..b5dab5f344 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java @@ -26,11 +26,14 @@ import javax.jms.TextMessage; import java.util.Map; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.failover.FailoverTransport; import org.junit.Assert; import org.junit.Test; @@ -46,7 +49,9 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { protected void configureAddressSettings(Map addressSettingsMap) { super.configureAddressSettings(addressSettingsMap); // force send to dlq early - addressSettingsMap.get("#").setMaxDeliveryAttempts(2); + addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2)); + // force send to dlq late + addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000)); } @Test(timeout = 60_000) @@ -93,4 +98,72 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { } } } + + @Test(timeout = 60_000) + public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws Exception { + Connection exConn = null; + + SimpleString durableQueue = new SimpleString("exampleQueueTwo"); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + + ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new ActiveMQPrefetchPolicy(); + prefetchPastMaxDeliveriesInLoop.setAll(2000); + exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop); + + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setMaximumRedeliveries(4000); + exFact.setRedeliveryPolicy(redeliveryPolicy); + + Queue queue = new ActiveMQQueue("exampleQueueTwo"); + + exConn = exFact.createConnection(); + + exConn.start(); + + Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + int numMessages = 2000; + for (int i = 0; i < numMessages; i++) { + message.setIntProperty("SEQ", i); + producer.send(message); + } + session.commit(); + exConn.close(); + + final int batch = 100; + for (int i = 0; i < numMessages; i += batch) { + // connection per batch + exConn = exFact.createConnection(); + exConn.start(); + + session = exConn.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer messageConsumer = session.createConsumer(queue); + TextMessage messageReceived = null; + for (int j = 0; j < batch; j++) { // a small batch + messageReceived = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull("null @ i=" + i, messageReceived); + Assert.assertEquals(i + j, messageReceived.getIntProperty("SEQ")); + + assertEquals("This is a text message", messageReceived.getText()); + } + session.commit(); + + ((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop(); + exConn.close(); + } + } finally { + if (exConn != null) { + exConn.close(); + } + } + } }