From 1d1f03ad5e2644b558b5eb65f737665b24697689 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 23 Sep 2024 17:02:30 -0400 Subject: [PATCH] ARTEMIS-4971 Warning on Unacked messages through mirror AckManager --- .../config/ActiveMQDefaultConfiguration.java | 5 ++ .../amqp/connect/mirror/AckManager.java | 28 ++++++---- .../logger/ActiveMQAMQPProtocolLogger.java | 6 +++ .../artemis/core/config/Configuration.java | 9 ++++ .../core/config/impl/ConfigurationImpl.java | 13 +++++ .../impl/FileConfigurationParser.java | 3 ++ .../schema/artemis-configuration.xsd | 8 +++ .../config/impl/FileConfigurationTest.java | 1 + .../ConfigurationTest-full-config.xml | 1 + .../ConfigurationTest-xinclude-config.xml | 1 + ...nfigurationTest-xinclude-schema-config.xml | 1 + .../amqp/connect/AckManagerTest.java | 51 +++++++++++++++++++ 12 files changed, 117 insertions(+), 10 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index d8bf42940d..11de47c758 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -715,6 +715,7 @@ public final class ActiveMQDefaultConfiguration { private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));; + private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED = false; private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false; /** @@ -1961,6 +1962,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS; } + public static boolean getMirrorAckManagerWarnUnacked() { + return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED; + } + public static int getMirrorAckManagerRetryDelay() { return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 3dc5322136..0ef1a9b649 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +79,7 @@ public class AckManager implements ActiveMQComponent { ActiveMQScheduledComponent scheduledComponent; public AckManager(ActiveMQServer server) { + assert server != null && server.getConfiguration() != null; this.server = server; this.configuration = server.getConfiguration(); this.ioCriticalErrorListener = server.getIoCriticalErrorListener(); @@ -260,7 +262,7 @@ public class AckManager implements ActiveMQComponent { page.usageDown(); } } - validateExpiredSet(acksToRetry); + validateExpiredSet(address, acksToRetry); } else { logger.trace("Page Scan not required for address {}", address); } @@ -283,16 +285,19 @@ public class AckManager implements ActiveMQComponent { } - private void validateExpiredSet(LongObjectHashMap> queuesToRetry) { - queuesToRetry.forEach(this::validateExpireSet); + private void validateExpiredSet(SimpleString address, LongObjectHashMap> queuesToRetry) { + queuesToRetry.forEach((q, r) -> this.validateExpireSet(address, q, r)); } - private void validateExpireSet(long queueID, JournalHashMap retries) { + private void validateExpireSet(SimpleString address, long queueID, JournalHashMap retries) { for (AckRetry retry : retries.valuesCopy()) { if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) { if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) { + if (configuration.isMirrorAckManagerWarnUnacked()) { + ActiveMQAMQPProtocolLogger.LOGGER.ackRetryFailed(retry, address, queueID); + } if (logger.isDebugEnabled()) { - logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); + logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } retries.remove(retry); } else { @@ -300,6 +305,8 @@ public class AckManager implements ActiveMQComponent { logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } } + } else { + logger.debug("Retry {} queue attempted {} times on paging, QueueAttempts {} Configuration Page Attempts={}", retry, retry.getQueueAttempts(), retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } } } @@ -418,9 +425,14 @@ public class AckManager implements ActiveMQComponent { if (reference == null) { if (logger.isDebugEnabled()) { logger.debug("ACK Manager could not find reference nodeID={} (while localID={}), messageID={} on queue {}, server={}. Adding retry with minQueue={}, maxPage={}, delay={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server, configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts(), configuration.getMirrorAckManagerRetryDelay()); - printQueueDebug(targetQueue); } + if (allowRetry) { + if (configuration != null && configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount() > 0) { + ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID); + } else { + logger.debug("There are {} consumers on queue {}, what made Ack for message with nodeID={}, messageID={} enter a retry list", targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID); + } addRetry(nodeID, targetQueue, messageID, reason); } return false; @@ -436,10 +448,6 @@ public class AckManager implements ActiveMQComponent { } } - private void printQueueDebug(Queue targetQueue) { - logger.debug("... queue {}/{} had {} consumers, {} messages, {} scheduled messages, {} delivering messages, paging={}", targetQueue.getID(), targetQueue.getName(), targetQueue.getConsumerCount(), targetQueue.getMessageCount(), targetQueue.getScheduledCount(), targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging()); - } - private void doACK(Queue targetQueue, MessageReference reference, AckReason reason) { try { switch (reason) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java index dc7dbda0ac..02e78a783b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java @@ -67,4 +67,10 @@ public interface ActiveMQAMQPProtocolLogger { @LogMessage(id = 111010, value = "Duplicate AckManager node detected. Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN) void duplicateNodeStoreID(String queue, String serverId, long recordID, Exception trace); + + @LogMessage(id = 111011, value = "There are {} consumers on queue {}, what made the Ack for message with nodeID={}, messageID={} enter a retry list", level = LogMessage.Level.WARN) + void unackWithConsumer(int numberOfConsumers, Object queueName, String nodeID, long messageID); + + @LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on address {}, queueID={}", level = LogMessage.Level.WARN) + void ackRetryFailed(Object ackRetryInformation, Object address, long queueID); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 1df52bd0bc..17765802b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1535,4 +1535,13 @@ public interface Configuration { boolean isMirrorPageTransaction(); Configuration setMirrorPageTransaction(boolean ignorePageTransactions); + + /** + * should log.warn when ack retries failed. + * @param warnUnacked + * @return + */ + Configuration setMirrorAckManagerWarnUnacked(boolean warnUnacked); + + boolean isMirrorAckManagerWarnUnacked(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 4983527b6b..88b7721099 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -444,6 +444,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts(); + private boolean mirrorAckManagerWarnUnacked = ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked(); + private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay(); private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction(); @@ -3385,6 +3387,17 @@ public class ConfigurationImpl implements Configuration, Serializable { return mirrorAckManagerQueueAttempts; } + @Override + public boolean isMirrorAckManagerWarnUnacked() { + return mirrorAckManagerWarnUnacked; + } + + @Override + public ConfigurationImpl setMirrorAckManagerWarnUnacked(boolean warnUnacked) { + this.mirrorAckManagerWarnUnacked = warnUnacked; + return this; + } + @Override public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) { logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 588e0651fa..6819b307ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -386,6 +386,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = "mirror-ack-manager-page-attempts"; private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = "mirror-ack-manager-retry-delay"; + private static final String MIRROR_ACK_MANAGER_WARN_UNACKED = "mirror-ack-manager-warn-unacked"; private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction"; @@ -868,6 +869,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setMirrorAckManagerRetryDelay(getInteger(e, MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(), GT_ZERO)); + config.setMirrorAckManagerWarnUnacked(getBoolean(e, MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked())); + parseAddressSettings(e, config); parseResourceLimits(e, config); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e16a7dc9b2..4905b77f98 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -944,6 +944,14 @@ + + + + Should the system log.warn when an acknowledgment retry fails (unacked). + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 3d8fb9b033..e32527ee31 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -582,6 +582,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase { assertEquals(Integer.valueOf(128), conf.getAddressSettings().get("a2").getInitialQueueBufferSize()); assertEquals(111, conf.getMirrorAckManagerQueueAttempts()); + assertTrue(conf.isMirrorAckManagerWarnUnacked()); assertEquals(222, conf.getMirrorAckManagerPageAttempts()); assertEquals(333, conf.getMirrorAckManagerRetryDelay()); assertTrue(conf.isMirrorPageTransaction()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 36d43ad604..f53d5a4844 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -558,6 +558,7 @@ 111 222 333 + true true diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 689ee7e436..99fac28fca 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -74,6 +74,7 @@ 111 222 333 + true true diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml index 1c163b4a14..c26879b294 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml @@ -74,6 +74,7 @@ 111 222 333 + true true diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index 89f81fd898..fc3b53efa3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -45,11 +46,13 @@ import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager; @@ -64,6 +67,7 @@ import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,6 +298,53 @@ public class AckManagerTest extends ActiveMQTestBase { } + @Test + public void testLogUnack() throws Throwable { + String protocol = "AMQP"; + + SimpleString TOPIC_NAME = SimpleString.of("tp" + RandomUtil.randomString()); + + server1.addAddressInfo(new AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + // creating 5 subscriptions + for (int i = 0; i < 5; i++) { + try (Connection connection = connectionFactory.createConnection()) { + connection.setClientID("c" + i); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(TOPIC_NAME.toString()); + session.createDurableSubscriber(topic, "s" + i); + } + } + + Queue c1s1 = server1.locateQueue("c1.s1"); + assertNotNull(c1s1); + Queue c2s2 = server1.locateQueue("c2.s2"); + assertNotNull(c2s2); + + try (AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler()) { + server1.getConfiguration().setMirrorAckManagerWarnUnacked(true); + c1s1.addConsumer(Mockito.mock(Consumer.class)); + AckManager ackManager = AckManagerProvider.getManager(server1); + ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true); + + // ID for there are consumers + Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111011"), 5000, 100); + // ID for give up retry + Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111012"), 5000, 100); + + server1.getConfiguration().setMirrorAckManagerWarnUnacked(false); + assertionLoggerHandler.clear(); + + ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true); + + // ID for there are consumers + assertFalse(assertionLoggerHandler.findText("AMQ111011")); + // ID for give up retry + assertFalse(assertionLoggerHandler.findText("AMQ111012")); + } + } @Test public void testRetryFromPaging() throws Throwable {