diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 47cd68ef68..faac337cce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -895,7 +895,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) { + if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && + intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && + pageIterator != null && !pageIterator.hasNext() && + pageSubscription != null && !pageSubscription.isPaging()) { // We must block on the executor to ensure any async deliveries have completed or we might get out of order // deliveries // Go into direct delivery mode diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index d4cbdd3331..10cf532e53 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.paging; import java.nio.ByteBuffer; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -33,11 +35,13 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -287,4 +291,62 @@ public class GlobalPagingTest extends PagingTest { } } + @Test + public void testManagementMessageRequestCannotFailAfterFailedDirectDeliver() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false); + + final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1); + + try { + final SimpleString managementAddress = server.getConfiguration().getManagementAddress(); + server.start(); + //need to use Netty in order to have direct delivery available + final ServerLocator locator = createNettyNonHALocator() + .setBlockOnNonDurableSend(true) + .setBlockOnDurableSend(true) + .setBlockOnAcknowledge(true); + + try (ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true)) { + + session.start(); + + if (server.locateQueue(managementAddress) == null) { + + session.createQueue(managementAddress, managementAddress, null, true); + } + + final SimpleString address = SimpleString.toSimpleString("queue"); + + if (server.locateQueue(address) == null) { + + session.createQueue(address, address, null, true); + } + + try (ClientProducer requestProducer = session.createProducer(managementAddress)) { + final SimpleString replyQueue = new SimpleString(managementAddress + "." + UUID.randomUUID().toString()); + session.createTemporaryQueue(replyQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), replyQueue); + try (ClientConsumer consumer = session.createConsumer(replyQueue)) { + final Queue queue = server.locateQueue(replyQueue); + final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue); + //it will cause QueueImpl::directDeliver -> false + queue.addHead(reference, false); + Assert.assertSame(reference, queue.removeReferenceWithID(reference.getMessageID())); + ClientMessage message = session.createMessage(false); + message.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyQueue); + ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount"); + requestProducer.send(message); + Assert.assertNotNull(consumer.receive()); + } + } + } + + } finally { + server.stop(true); + } + } + }