ARTEMIS-4362 Produce log.warn when the system cannot depage because of pending acks

This commit is contained in:
Clebert Suconic 2023-07-12 16:47:12 -04:00 committed by clebertsuconic
parent 451d03fd75
commit b967e6d940
4 changed files with 207 additions and 8 deletions

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.logs.BundleFactory;
import org.apache.activemq.artemis.logs.annotation.LogBundle;
import org.apache.activemq.artemis.logs.annotation.LogMessage;
/**
* This is using a separate Logger specific for Queue / QueueImpl.
* It's using Queue.class.getName() as the category as it would be possible to disable this logger with log4j.
* This is sharing the codes with ActiveMQServerLogger (meaning the codes between here and ActiveMQServerLogger have to be unique).
*/
@LogBundle(projectCode = "AMQ", regexID = "22[0-9]{4}")
public interface ActiveMQQueueLogger {
ActiveMQQueueLogger LOGGER = BundleFactory.newBundle(ActiveMQQueueLogger.class, Queue.class.getName());
@LogMessage(id = 224127, value = "Message dispatch from paging is blocked. Address {}/Queue {} will not read any more messages from paging until pending messages are acknowledged. There are currently {} messages pending ({} bytes) with max reads at maxPageReadMessages({}) and maxPageReadBytes({}). Either increase reading attributes at the address-settings or change your consumers to acknowledge more often.", level = LogMessage.Level.WARN)
void warnPageFlowControl(String address,
String queue,
long messageCount,
long messageBytes,
long maxMessages,
long maxMessagesBytes);
}

View File

@ -641,7 +641,9 @@ public interface ActiveMQServerLogger {
void timeoutLockingConsumer(String consumer, String remoteAddress);
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName);
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
org.apache.activemq.artemis.api.core.Message messageCopy,
SimpleString idsHeaderName);
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)
void managementOperationError(String op, String resourceName, Exception e);
@ -905,10 +907,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222201, value = "Timed out waiting for activation to exit", level = LogMessage.Level.WARN)
void activationTimeout();
@LogMessage(id = 222202, value = "{}: <{}> should not be set to the same value as <{}>. " +
"If a system is under high load, or there is a minor network delay, " +
"there is a high probability of a cluster split/failure due to connection timeout.",
level = LogMessage.Level.WARN)
@LogMessage(id = 222202, value = "{}: <{}> should not be set to the same value as <{}>. " + "If a system is under high load, or there is a minor network delay, " + "there is a high probability of a cluster split/failure due to connection timeout.", level = LogMessage.Level.WARN)
void connectionTTLEqualsCheckPeriod(String connectionName, String ttl, String checkPeriod);
@LogMessage(id = 222203, value = "Classpath lacks a protocol-manager for protocol {}, Protocol being ignored on acceptor {}", level = LogMessage.Level.WARN)
@ -1180,7 +1179,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222295, value = "There is a possible split brain on nodeID {}. Topology update ignored", level = LogMessage.Level.WARN)
void possibleSplitBrain(String nodeID);
@LogMessage(id = 222296, value = "Unable to deploy Hawtio MBeam, console client side RBAC not available", level = LogMessage.Level.WARN)
void unableToDeployHawtioMBean(Throwable e);
@ -1584,4 +1582,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224126, value = "Failure during protocol handshake on connection to {} from {}", level = LogMessage.Level.ERROR)
void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e);
// notice loggerID=224127 is reserved as it's been used at ActiveMQQueueLogger
}

View File

@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQQueueLogger;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
@ -179,6 +180,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean queueDestroyed = false;
// Variable to control if we should print a flow controlled message or not.
// Once it was flow controlled, we will stop warning until it's cleared once again
private volatile boolean pageFlowControlled = false;
private volatile long pageFlowControlledLastLog = 0;
// It is not expected to have an user really changing this. This is a system property now in case users disagree and find value on changing it.
// In case there is in fact value on changing it we may consider bringing it as an address-settings or broker.xml
private static final long PAGE_FLOW_CONTROL_PRINT_INTERVAL = Long.parseLong(System.getProperty("ARTEMIS_PAGE_FLOW_CONTROL_PRINT_INTERVAL", "60000"));
// once we delivered messages from paging, we need to call asyncDelivery upon acks
// if we flow control paging, ack more messages will open the space to deliver more messages
// hence we will need this flag to determine if it was paging before.
@ -3298,8 +3309,28 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxSize() &&
intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
} else {
return (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
boolean needsDepageResult = (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
(maxReadMessages <= 0 || (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < maxReadMessages);
if (!needsDepageResult) {
if (!pageFlowControlled && (maxReadBytes > 0 && deliveringMetrics.getPersistentSize() >= maxReadBytes || maxReadMessages > 0 && deliveringMetrics.getMessageCount() >= maxReadMessages)) {
if (System.currentTimeMillis() - pageFlowControlledLastLog > PAGE_FLOW_CONTROL_PRINT_INTERVAL) {
pageFlowControlledLastLog = System.currentTimeMillis();
ActiveMQQueueLogger.LOGGER.warnPageFlowControl(String.valueOf(address), String.valueOf(name), deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes);
}
if (logger.isDebugEnabled()) {
logger.debug("Message dispatch from paging is blocked. Address {}/Queue{} will not read any more messages from paging " +
"until pending messages are acknowledged. There are currently {} messages pending ({} bytes) with max reads at " +
"maxPageReadMessages({}) and maxPageReadBytes({}). Either increase reading attributes at the address-settings or change your consumers to acknowledge more often.",
address, name, deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes);
}
pageFlowControlled = true;
}
} else {
pageFlowControlled = false;
}
return needsDepageResult;
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PagingMaxReadLimitTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server;
@Test
public void testMaxReadPageMessages() throws Exception {
ExecutorService service = Executors.newSingleThreadExecutor();
runAfter(service::shutdownNow);
Configuration config = createDefaultConfig(true);
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
final int PAGE_MAX = 20 * 1024;
final int PAGE_SIZE = 10 * 1024;
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 100, -1, (long) (PAGE_MAX * 10), null, null, null);
server.start();
server.addAddressInfo(new AddressInfo(getName()).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(getName()) != null);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(getName());
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 500; i++) {
producer.send(session.createTextMessage("Hello " + i));
}
session.commit();
Assert.assertTrue(serverQueue.getPagingStore().isPaging());
}
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AtomicInteger errorCounter = new AtomicInteger(0);
CountDownLatch done = new CountDownLatch(1);
service.execute(() -> {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 500;) {
Message message = consumer.receive(10);
if (message == null) {
session.commit();
} else {
i++;
}
}
session.commit();
} catch (Throwable e) {
logger.debug(e.getMessage(), e);
errorCounter.incrementAndGet();
} finally {
done.countDown();
}
});
Assert.assertTrue(done.await(5, TimeUnit.SECONDS));
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ224127"), 2000, 10);
Assert.assertEquals(0, errorCounter.get());
}
}