From fa80c03049d1faaca5e3f00a68b5ac438cd40548 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 9 Apr 2021 19:51:09 +0100 Subject: [PATCH] ARTEMIS-3234 - fix and test, the existing tests suffered with suppressInternalManagementObjects defaulting to true. credit accounting is now independent of the ack list such that preack for advisories can work --- .../protocol/openwire/amq/AMQConsumer.java | 100 +++++++++-------- .../openwire/AdvisoryOpenWireTest.java | 106 +++++++++++------- 2 files changed, 115 insertions(+), 91 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 6a00f022fa..89eafe7a47 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -70,7 +70,7 @@ public class AMQConsumer { private int prefetchSize; private final AtomicInteger currentWindow; - private int deliveredAcks; + private int deliveredAcksCreditExtension = 0; private long messagePullSequence = 0; private final AtomicReference messagePullHandler = new AtomicReference<>(null); //internal means we don't expose @@ -90,7 +90,6 @@ public class AMQConsumer { this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); this.currentWindow = new AtomicInteger(prefetchSize); - this.deliveredAcks = 0; if (prefetchSize == 0) { messagePullHandler.set(new MessagePullHandler()); } @@ -295,6 +294,28 @@ public class AMQConsumer { */ public void acknowledge(MessageAck ack) throws Exception { + if (ack.isRedeliveredAck()) { + // we don't mind if the client thinks it is a redelivery + return; + } + + final int ackMessageCount = ack.getMessageCount(); + acquireCredit(ackMessageCount); + + if (ack.isDeliveredAck()) { + deliveredAcksCreditExtension += ackMessageCount; + // our work is done + return; + } + + // some sort of real ack, rebalance deliveredAcksCreditExtension + if (deliveredAcksCreditExtension > 0) { + deliveredAcksCreditExtension -= ackMessageCount; + if (deliveredAcksCreditExtension >= 0) { + currentWindow.addAndGet(-ackMessageCount); + } + } + final MessageId startID, lastID; if (ack.getFirstMessageId() == null) { @@ -309,59 +330,42 @@ public class AMQConsumer { if (serverConsumer.getQueue().isNonDestructive()) { removeReferences = false; } - if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) { - removeReferences = false; - } - List ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData())); + final List ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData())); - if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) { - if (deliveredAcks < ackList.size()) { - acquireCredit(ackList.size() - deliveredAcks); - deliveredAcks = 0; - } else { - deliveredAcks -= ackList.size(); - } - } else { - if (ack.isDeliveredAck()) { - this.deliveredAcks += ack.getMessageCount(); - } - - acquireCredit(ack.getMessageCount()); - } - - if (removeReferences) { - - Transaction originalTX = session.getCoreSession().getCurrentTransaction(); - Transaction transaction; - - if (originalTX == null) { - transaction = session.getCoreSession().newTransaction(); - } else { - transaction = originalTX; - } - - if (ack.isIndividualAck() || ack.isStandardAck()) { + if (!ackList.isEmpty()) { + if (ack.isExpiredAck()) { for (MessageReference ref : ackList) { - ref.acknowledge(transaction, serverConsumer); + ref.getQueue().expire(ref, serverConsumer); } - } else if (ack.isPoisonAck()) { - for (MessageReference ref : ackList) { - Throwable poisonCause = ack.getPoisonCause(); - if (poisonCause != null) { - ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString())); + } else if (removeReferences) { + + Transaction originalTX = session.getCoreSession().getCurrentTransaction(); + Transaction transaction; + + if (originalTX == null) { + transaction = session.getCoreSession().newTransaction(); + } else { + transaction = originalTX; + } + + if (ack.isIndividualAck() || ack.isStandardAck()) { + for (MessageReference ref : ackList) { + ref.acknowledge(transaction, serverConsumer); + } + } else if (ack.isPoisonAck()) { + for (MessageReference ref : ackList) { + Throwable poisonCause = ack.getPoisonCause(); + if (poisonCause != null) { + ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString())); + } + ref.getQueue().sendToDeadLetterAddress(transaction, ref); } - ref.getQueue().sendToDeadLetterAddress(transaction, ref); } - } - if (originalTX == null) { - transaction.commit(true); - } - } - if (ack.isExpiredAck()) { - for (MessageReference ref : ackList) { - ref.getQueue().expire(ref, serverConsumer); + if (originalTX == null) { + transaction.commit(true); + } } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java index a76f5e322d..fb80e2eeb8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.openwire; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; import org.junit.Test; @@ -30,6 +29,8 @@ import javax.jms.TemporaryTopic; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.config.Configuration; + public class AdvisoryOpenWireTest extends BasicOpenWireTest { @Override @@ -44,12 +45,17 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest { super.setUp(); } + @Override + protected void extraServerConfig(Configuration serverConfig) { + // ensure advisory addresses are visible + serverConfig.getAcceptorConfigurations().iterator().next().getExtraParams().put("suppressInternalManagementObjects", "false"); + super.extraServerConfig(serverConfig); + } + @Test public void testTempTopicLeak() throws Exception { - Connection connection = null; - try { - connection = factory.createConnection(); + try (Connection connection = factory.createConnection()) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -57,32 +63,32 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest { TemporaryTopic temporaryTopic = session.createTemporaryTopic(); temporaryTopic.delete(); - Object[] queueResources = server.getManagementService().getResources(QueueControl.class); + AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempTopic"); + Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0); - for (Object queueResource : queueResources) { + Wait.assertEquals(0, advisoryAddress::getMessageCount); + Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount); - if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) { - QueueControl queueControl = (QueueControl) queueResource; - Wait.waitFor(() -> queueControl.getMessageCount() == 0); - assertNotNull("addressControl for temp advisory", queueControl); + } + } - Wait.assertEquals(0, queueControl::getMessageCount); - Wait.assertEquals(2, queueControl::getMessagesAdded); - } - } - } finally { - if (connection != null) { - connection.close(); + private AddressControl assertNonNullAddressControl(String match) { + AddressControl advisoryAddressControl = null; + Object[] addressResources = server.getManagementService().getResources(AddressControl.class); + + for (Object addressResource : addressResources) { + if (((AddressControl) addressResource).getAddress().equals(match)) { + advisoryAddressControl = (AddressControl) addressResource; } } + assertNotNull("addressControl for temp advisory", advisoryAddressControl); + return advisoryAddressControl; } @Test public void testTempQueueLeak() throws Exception { - Connection connection = null; - try { - connection = factory.createConnection(); + try (Connection connection = factory.createConnection()) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -90,23 +96,12 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest { TemporaryQueue temporaryQueue = session.createTemporaryQueue(); temporaryQueue.delete(); - Object[] queueResources = server.getManagementService().getResources(QueueControl.class); + AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue"); + Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0); - for (Object queueResource : queueResources) { + Wait.assertEquals(0, advisoryAddress::getMessageCount); + Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount); - if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) { - QueueControl queueControl = (QueueControl) queueResource; - Wait.waitFor(() -> queueControl.getMessageCount() == 0); - assertNotNull("addressControl for temp advisory", queueControl); - Wait.assertEquals(0, queueControl::getMessageCount); - Wait.assertEquals(2, queueControl::getMessagesAdded); - - } - } - } finally { - if (connection != null) { - connection.close(); - } } } @@ -127,20 +122,45 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest { temporaryQueue.delete(); } - Object[] addressResources = server.getManagementService().getResources(AddressControl.class); + AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue"); - for (Object addressResource : addressResources) { + Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0); + Wait.assertEquals(0, advisoryAddress::getMessageCount); - if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) { - AddressControl addressControl = (AddressControl) addressResource; - Wait.waitFor(() -> addressControl.getMessageCount() == 0); - assertNotNull("addressControl for temp advisory", addressControl); - Wait.assertEquals(0, addressControl::getMessageCount); + } finally { + for (Connection conn : connections) { + if (conn != null) { + conn.close(); } } + } + } + @Test + public void testLongLivedConnectionGetsAllPastPrefetch() throws Exception { + final Connection[] connections = new Connection[2]; + + final int numTempDestinations = 600; // such that 2x exceeds default 1k prefetch for advisory consumer + try { + for (int i = 0; i < connections.length; i++) { + connections[i] = factory.createConnection(); + connections[i].start(); + } + + Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < numTempDestinations; i++) { + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + temporaryQueue.delete(); + } + + AddressControl advisoryAddress = assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue"); + Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0); + Wait.assertEquals(0, advisoryAddress::getMessageCount); + + // there is an advisory for create and another for delete + assertEquals("all routed", numTempDestinations * 2, advisoryAddress.getRoutedMessageCount()); - //sleep a bit to allow message count to go down. } finally { for (Connection conn : connections) { if (conn != null) {