diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java new file mode 100644 index 0000000000..4e1c16cacc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.logs.BundleFactory; +import org.apache.activemq.artemis.logs.annotation.LogBundle; +import org.apache.activemq.artemis.logs.annotation.LogMessage; + +/** + * This is using a separate Logger specific for Queue / QueueImpl. + * It's using Queue.class.getName() as the category as it would be possible to disable this logger with log4j. + * This is sharing the codes with ActiveMQServerLogger (meaning the codes between here and ActiveMQServerLogger have to be unique). + */ +@LogBundle(projectCode = "AMQ", regexID = "22[0-9]{4}") +public interface ActiveMQQueueLogger { + + ActiveMQQueueLogger LOGGER = BundleFactory.newBundle(ActiveMQQueueLogger.class, Queue.class.getName()); + + @LogMessage(id = 224127, value = "Message dispatch from paging is blocked. Address {}/Queue {} will not read any more messages from paging until pending messages are acknowledged. There are currently {} messages pending ({} bytes) with max reads at maxPageReadMessages({}) and maxPageReadBytes({}). Either increase reading attributes at the address-settings or change your consumers to acknowledge more often.", level = LogMessage.Level.WARN) + void warnPageFlowControl(String address, + String queue, + long messageCount, + long messageBytes, + long maxMessages, + long maxMessagesBytes); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index aad5fd93f0..72ef079664 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -641,7 +641,9 @@ public interface ActiveMQServerLogger { void timeoutLockingConsumer(String consumer, String remoteAddress); @LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN) - void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName); + void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, + org.apache.activemq.artemis.api.core.Message messageCopy, + SimpleString idsHeaderName); @LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE) void managementOperationError(String op, String resourceName, Exception e); @@ -905,10 +907,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222201, value = "Timed out waiting for activation to exit", level = LogMessage.Level.WARN) void activationTimeout(); - @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same value as <{}>. " + - "If a system is under high load, or there is a minor network delay, " + - "there is a high probability of a cluster split/failure due to connection timeout.", - level = LogMessage.Level.WARN) + @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same value as <{}>. " + "If a system is under high load, or there is a minor network delay, " + "there is a high probability of a cluster split/failure due to connection timeout.", level = LogMessage.Level.WARN) void connectionTTLEqualsCheckPeriod(String connectionName, String ttl, String checkPeriod); @LogMessage(id = 222203, value = "Classpath lacks a protocol-manager for protocol {}, Protocol being ignored on acceptor {}", level = LogMessage.Level.WARN) @@ -1118,7 +1117,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222702, value = "Message ack in prepared tx for queue {} which does not exist. This ack will be ignored.", level = LogMessage.Level.WARN) void journalMessageAckMissingQueueInPreparedTX(Long queueID); - @LogMessage(id = 222703, value = "Address \"{}\" is full. Bridge {} will disconnect", level = LogMessage.Level.WARN) + @LogMessage(id = 222703, value = "Address \"{}\" is full. Bridge {} will disconnect", level = LogMessage.Level.WARN) void bridgeAddressFull(String addressName, String bridgeName); @LogMessage(id = 222274, value = "Failed to deploy address {}: {}", level = LogMessage.Level.WARN) @@ -1180,7 +1179,6 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222295, value = "There is a possible split brain on nodeID {}. Topology update ignored", level = LogMessage.Level.WARN) void possibleSplitBrain(String nodeID); - @LogMessage(id = 222296, value = "Unable to deploy Hawtio MBeam, console client side RBAC not available", level = LogMessage.Level.WARN) void unableToDeployHawtioMBean(Throwable e); @@ -1584,4 +1582,6 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224126, value = "Failure during protocol handshake on connection to {} from {}", level = LogMessage.Level.ERROR) void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e); + + // notice loggerID=224127 is reserved as it's been used at ActiveMQQueueLogger } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7ded73cf11..2ff193595b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQQueueLogger; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; @@ -179,6 +180,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile boolean queueDestroyed = false; + // Variable to control if we should print a flow controlled message or not. + // Once it was flow controlled, we will stop warning until it's cleared once again + private volatile boolean pageFlowControlled = false; + + private volatile long pageFlowControlledLastLog = 0; + + // It is not expected to have an user really changing this. This is a system property now in case users disagree and find value on changing it. + // In case there is in fact value on changing it we may consider bringing it as an address-settings or broker.xml + private static final long PAGE_FLOW_CONTROL_PRINT_INTERVAL = Long.parseLong(System.getProperty("ARTEMIS_PAGE_FLOW_CONTROL_PRINT_INTERVAL", "60000")); + // once we delivered messages from paging, we need to call asyncDelivery upon acks // if we flow control paging, ack more messages will open the space to deliver more messages // hence we will need this flag to determine if it was paging before. @@ -3298,8 +3309,28 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxSize() && intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM; } else { - return (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) && + boolean needsDepageResult = (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) && (maxReadMessages <= 0 || (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < maxReadMessages); + + if (!needsDepageResult) { + if (!pageFlowControlled && (maxReadBytes > 0 && deliveringMetrics.getPersistentSize() >= maxReadBytes || maxReadMessages > 0 && deliveringMetrics.getMessageCount() >= maxReadMessages)) { + if (System.currentTimeMillis() - pageFlowControlledLastLog > PAGE_FLOW_CONTROL_PRINT_INTERVAL) { + pageFlowControlledLastLog = System.currentTimeMillis(); + ActiveMQQueueLogger.LOGGER.warnPageFlowControl(String.valueOf(address), String.valueOf(name), deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes); + } + if (logger.isDebugEnabled()) { + logger.debug("Message dispatch from paging is blocked. Address {}/Queue{} will not read any more messages from paging " + + "until pending messages are acknowledged. There are currently {} messages pending ({} bytes) with max reads at " + + "maxPageReadMessages({}) and maxPageReadBytes({}). Either increase reading attributes at the address-settings or change your consumers to acknowledge more often.", + address, name, deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes); + } + pageFlowControlled = true; + } + } else { + pageFlowControlled = false; + } + + return needsDepageResult; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java new file mode 100644 index 0000000000..a814b17cc0 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PagingMaxReadLimitTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ActiveMQServer server; + + @Test + public void testMaxReadPageMessages() throws Exception { + + ExecutorService service = Executors.newSingleThreadExecutor(); + runAfter(service::shutdownNow); + + Configuration config = createDefaultConfig(true); + config.setJournalSyncTransactional(false).setJournalSyncTransactional(false); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 100, -1, (long) (PAGE_MAX * 10), null, null, null); + server.start(); + + server.addAddressInfo(new AddressInfo(getName()).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(getName()) != null); + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(getName()); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + try (Connection connection = connectionFactory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getName()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 500; i++) { + producer.send(session.createTextMessage("Hello " + i)); + } + session.commit(); + + Assert.assertTrue(serverQueue.getPagingStore().isPaging()); + } + + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + + AtomicInteger errorCounter = new AtomicInteger(0); + CountDownLatch done = new CountDownLatch(1); + + service.execute(() -> { + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getName()); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < 500;) { + Message message = consumer.receive(10); + if (message == null) { + session.commit(); + } else { + i++; + } + } + session.commit(); + } catch (Throwable e) { + logger.debug(e.getMessage(), e); + errorCounter.incrementAndGet(); + } finally { + done.countDown(); + } + }); + + Assert.assertTrue(done.await(5, TimeUnit.SECONDS)); + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ224127"), 2000, 10); + Assert.assertEquals(0, errorCounter.get()); + + } + +} \ No newline at end of file