From d9e114da551a9e35e8dcefa1d25bad3125dcd038 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 29 Jan 2021 15:17:23 -0600 Subject: [PATCH] ARTEMIS-3089 direct delivery can break LVQ+non-destructive --- .../core/server/impl/LastValueQueue.java | 4 +- .../amqp/JMSNonDestructiveTest.java | 72 +++++++++++++++++++ 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index d0ee9719ba..1df1bceee6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -174,10 +174,10 @@ public class LastValueQueue extends QueueImpl { map.put(prop, hr); - super.addTail(hr, direct); + super.addTail(hr, isNonDestructive() ? false : direct); } } else { - super.addTail(ref, direct); + super.addTail(ref, isNonDestructive() ? false : direct); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 7d96e287c4..c39381af1d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -25,6 +26,10 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -129,6 +134,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { testNonDestructive(AMQPConnection, CoreConnection); } + @Test + public void testNonDestructiveLVQWithConsumerFirstCore() throws Exception { + testNonDestructiveLVQWithConsumerFirst(CoreConnection); + } + + @Test + public void testNonDestructiveLVQWithConsumerFirstAMQP() throws Exception { + testNonDestructiveLVQWithConsumerFirst(AMQPConnection); + } + public void testNonDestructive(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { testNonDestructiveSingle(producerConnectionSupplier, consumerConnectionSupplier); testNonDestructiveDualConsumer(producerConnectionSupplier, consumerConnectionSupplier); @@ -286,6 +301,63 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount()); } + public void testNonDestructiveLVQWithConsumerFirst(ConnectionSupplier connectionSupplier) throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(1); + CountDownLatch consumerSetup = new CountDownLatch(1); + CountDownLatch consumerComplete = new CountDownLatch(1); + + /* + * Create the consumer before any messages are sent and keep it there so that the first message which arrives + * on the queue triggers direct delivery. Without the fix in this commit this essentially "poisons" the queue + * so that consumers can't get messages later. + */ + executor.submit(() -> { + try (Connection connection = connectionSupplier.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) { + connection.start(); + consumerSetup.countDown(); + BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000); + assertNotNull(messageReceived); + consumerComplete.countDown(); + } catch (Exception e) { + fail(e.getMessage()); + } + + consumerComplete.countDown(); + }); + + // wait for the consumer thread to start and get everything setup + consumerSetup.await(5, TimeUnit.SECONDS); + + try (Connection connection = connectionSupplier.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + MessageProducer producer = session.createProducer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME)); + BytesMessage message = session.createBytesMessage(); + message.writeUTF("mills " + System.currentTimeMillis()); + message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME"); + producer.send(message); + + // wait for the consumer to close then send another message + consumerComplete.await(5, TimeUnit.SECONDS); + + message = session.createBytesMessage(); + message.writeUTF("mills " + System.currentTimeMillis()); + message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME"); + producer.send(message); + } + + try (Connection connection = connectionSupplier.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) { + connection.start(); + BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000); + assertNotNull(messageReceived); + } + + executor.shutdownNow(); + } + public void testNonDestructiveLVQTombstone(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { int tombstoneTimeToLive = 500;