ARTEMIS-4971 Warning on Unacked messages through mirror AckManager

This commit is contained in:
Clebert Suconic 2024-09-23 17:02:30 -04:00 committed by clebertsuconic
parent e7ed4700e1
commit 1d1f03ad5e
12 changed files with 117 additions and 10 deletions

View File

@ -715,6 +715,7 @@ public final class ActiveMQDefaultConfiguration {
private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));;
private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED = false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
/**
@ -1961,6 +1962,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS;
}
public static boolean getMirrorAckManagerWarnUnacked() {
return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
}
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}

View File

@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -78,6 +79,7 @@ public class AckManager implements ActiveMQComponent {
ActiveMQScheduledComponent scheduledComponent;
public AckManager(ActiveMQServer server) {
assert server != null && server.getConfiguration() != null;
this.server = server;
this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
@ -260,7 +262,7 @@ public class AckManager implements ActiveMQComponent {
page.usageDown();
}
}
validateExpiredSet(acksToRetry);
validateExpiredSet(address, acksToRetry);
} else {
logger.trace("Page Scan not required for address {}", address);
}
@ -283,16 +285,19 @@ public class AckManager implements ActiveMQComponent {
}
private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
queuesToRetry.forEach(this::validateExpireSet);
private void validateExpiredSet(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
queuesToRetry.forEach((q, r) -> this.validateExpireSet(address, q, r));
}
private void validateExpireSet(long queueID, JournalHashMap<AckRetry, AckRetry, Queue> retries) {
private void validateExpireSet(SimpleString address, long queueID, JournalHashMap<AckRetry, AckRetry, Queue> retries) {
for (AckRetry retry : retries.valuesCopy()) {
if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) {
if (configuration.isMirrorAckManagerWarnUnacked()) {
ActiveMQAMQPProtocolLogger.LOGGER.ackRetryFailed(retry, address, queueID);
}
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
@ -300,6 +305,8 @@ public class AckManager implements ActiveMQComponent {
logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
}
} else {
logger.debug("Retry {} queue attempted {} times on paging, QueueAttempts {} Configuration Page Attempts={}", retry, retry.getQueueAttempts(), retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
}
}
@ -418,9 +425,14 @@ public class AckManager implements ActiveMQComponent {
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("ACK Manager could not find reference nodeID={} (while localID={}), messageID={} on queue {}, server={}. Adding retry with minQueue={}, maxPage={}, delay={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server, configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts(), configuration.getMirrorAckManagerRetryDelay());
printQueueDebug(targetQueue);
}
if (allowRetry) {
if (configuration != null && configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount() > 0) {
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
} else {
logger.debug("There are {} consumers on queue {}, what made Ack for message with nodeID={}, messageID={} enter a retry list", targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
}
addRetry(nodeID, targetQueue, messageID, reason);
}
return false;
@ -436,10 +448,6 @@ public class AckManager implements ActiveMQComponent {
}
}
private void printQueueDebug(Queue targetQueue) {
logger.debug("... queue {}/{} had {} consumers, {} messages, {} scheduled messages, {} delivering messages, paging={}", targetQueue.getID(), targetQueue.getName(), targetQueue.getConsumerCount(), targetQueue.getMessageCount(), targetQueue.getScheduledCount(), targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging());
}
private void doACK(Queue targetQueue, MessageReference reference, AckReason reason) {
try {
switch (reason) {

View File

@ -67,4 +67,10 @@ public interface ActiveMQAMQPProtocolLogger {
@LogMessage(id = 111010, value = "Duplicate AckManager node detected. Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN)
void duplicateNodeStoreID(String queue, String serverId, long recordID, Exception trace);
@LogMessage(id = 111011, value = "There are {} consumers on queue {}, what made the Ack for message with nodeID={}, messageID={} enter a retry list", level = LogMessage.Level.WARN)
void unackWithConsumer(int numberOfConsumers, Object queueName, String nodeID, long messageID);
@LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on address {}, queueID={}", level = LogMessage.Level.WARN)
void ackRetryFailed(Object ackRetryInformation, Object address, long queueID);
}

View File

@ -1535,4 +1535,13 @@ public interface Configuration {
boolean isMirrorPageTransaction();
Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
/**
* should log.warn when ack retries failed.
* @param warnUnacked
* @return
*/
Configuration setMirrorAckManagerWarnUnacked(boolean warnUnacked);
boolean isMirrorAckManagerWarnUnacked();
}

View File

@ -444,6 +444,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts();
private boolean mirrorAckManagerWarnUnacked = ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@ -3385,6 +3387,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return mirrorAckManagerQueueAttempts;
}
@Override
public boolean isMirrorAckManagerWarnUnacked() {
return mirrorAckManagerWarnUnacked;
}
@Override
public ConfigurationImpl setMirrorAckManagerWarnUnacked(boolean warnUnacked) {
this.mirrorAckManagerWarnUnacked = warnUnacked;
return this;
}
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts);

View File

@ -386,6 +386,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = "mirror-ack-manager-page-attempts";
private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = "mirror-ack-manager-retry-delay";
private static final String MIRROR_ACK_MANAGER_WARN_UNACKED = "mirror-ack-manager-warn-unacked";
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
@ -868,6 +869,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setMirrorAckManagerRetryDelay(getInteger(e, MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(), GT_ZERO));
config.setMirrorAckManagerWarnUnacked(getBoolean(e, MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
parseAddressSettings(e, config);
parseResourceLimits(e, config);

View File

@ -944,6 +944,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-ack-manager-warn-unacked" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>
Should the system log.warn when an acknowledgment retry fails (unacked).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-page-transaction" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>

View File

@ -582,6 +582,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
assertEquals(Integer.valueOf(128), conf.getAddressSettings().get("a2").getInitialQueueBufferSize());
assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
assertTrue(conf.isMirrorAckManagerWarnUnacked());
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
assertEquals(333, conf.getMirrorAckManagerRetryDelay());
assertTrue(conf.isMirrorPageTransaction());

View File

@ -558,6 +558,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
<security-settings>

View File

@ -74,6 +74,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
<remoting-incoming-interceptors>

View File

@ -74,6 +74,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
@ -45,11 +46,13 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
@ -64,6 +67,7 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -294,6 +298,53 @@ public class AckManagerTest extends ActiveMQTestBase {
}
@Test
public void testLogUnack() throws Throwable {
String protocol = "AMQP";
SimpleString TOPIC_NAME = SimpleString.of("tp" + RandomUtil.randomString());
server1.addAddressInfo(new AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
// creating 5 subscriptions
for (int i = 0; i < 5; i++) {
try (Connection connection = connectionFactory.createConnection()) {
connection.setClientID("c" + i);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME.toString());
session.createDurableSubscriber(topic, "s" + i);
}
}
Queue c1s1 = server1.locateQueue("c1.s1");
assertNotNull(c1s1);
Queue c2s2 = server1.locateQueue("c2.s2");
assertNotNull(c2s2);
try (AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler()) {
server1.getConfiguration().setMirrorAckManagerWarnUnacked(true);
c1s1.addConsumer(Mockito.mock(Consumer.class));
AckManager ackManager = AckManagerProvider.getManager(server1);
ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
// ID for there are consumers
Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111011"), 5000, 100);
// ID for give up retry
Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111012"), 5000, 100);
server1.getConfiguration().setMirrorAckManagerWarnUnacked(false);
assertionLoggerHandler.clear();
ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
// ID for there are consumers
assertFalse(assertionLoggerHandler.findText("AMQ111011"));
// ID for give up retry
assertFalse(assertionLoggerHandler.findText("AMQ111012"));
}
}
@Test
public void testRetryFromPaging() throws Throwable {