diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index 6f01a437fa..0c4dbcd95b 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -133,8 +133,14 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st DLQ ExpiryQueue 0 - + + + + -1 + + -1 10 ${full-policy} ${auto-create} diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/global-max-default.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/global-max-default.txt index 2d029ac24b..7b29efe529 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/global-max-default.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/global-max-default.txt @@ -1,9 +1,6 @@ - + 100Mb --> diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 8ffc514604..c12fbcb1a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -452,10 +452,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { long globalMaxMessages = getLong(e, GLOBAL_MAX_MESSAGES, -1, Validators.MINUS_ONE_OR_GT_ZERO); - if (globalMaxSize > 0) { - config.setGlobalMaxMessages(globalMaxMessages); - } - + config.setGlobalMaxMessages(globalMaxMessages); config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index d6bd9c261c..89ff513566 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -131,6 +131,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository return 0; } + default long getGlobalMessages() { + return 0; + } + /** * Use this when you have no refernce of an address. (anonymous AMQP Producers for example) * @param runWhenAvailable @@ -149,4 +153,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository default long getMaxSize() { return 0; } + + default long getMaxMessages() { + return 0; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 16aec18601..b04bbd7c2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -80,6 +80,8 @@ public final class PagingManagerImpl implements PagingManager { private long maxSize; + private long maxMessages; + private volatile boolean cleanupEnabled = true; private volatile boolean diskFull = false; @@ -117,6 +119,7 @@ public final class PagingManagerImpl implements PagingManager { this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; + this.maxMessages = maxMessages; this.globalSizeMetric = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages); globalSizeMetric.setSizeEnabled(maxSize >= 0); globalSizeMetric.setElementsEnabled(maxMessages >= 0); @@ -132,9 +135,10 @@ public final class PagingManagerImpl implements PagingManager { /** To be used in tests only called through PagingManagerTestAccessor */ - void resetMaxSize(long maxSize, long maxElements) { + void resetMaxSize(long maxSize, long maxMessages) { this.maxSize = maxSize; - this.globalSizeMetric.setMax(maxSize, maxSize, maxElements, maxElements); + this.maxMessages = maxMessages; + this.globalSizeMetric.setMax(maxSize, maxSize, maxMessages, maxMessages); } @Override @@ -142,6 +146,11 @@ public final class PagingManagerImpl implements PagingManager { return maxSize; } + @Override + public long getMaxMessages() { + return maxMessages; + } + public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository) { this(pagingSPI, addressSettingsRepository, -1, -1, null); @@ -186,6 +195,11 @@ public final class PagingManagerImpl implements PagingManager { return globalSizeMetric.getSize(); } + @Override + public long getGlobalMessages() { + return globalSizeMetric.getElements(); + } + protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) { if (!memoryCallback.isEmpty()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index f5a03c6396..d5132e705e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -530,7 +530,7 @@ public class PagingStoreImpl implements PagingStore { final boolean isPaging = this.paging; if (isPaging) { paging = false; - ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, getPageInfo()); } this.cursorProvider.onPageModeCleared(); } finally { @@ -538,6 +538,10 @@ public class PagingStoreImpl implements PagingStore { } } + private String getPageInfo() { + return String.format("size=%d bytes (%d messages); maxSize=%d bytes (%d messages); globalSize=%d bytes (%d messages); globalMaxSize=%d bytes (%d messages);", size.getSize(), size.getElements(), maxSize, maxMessages, pagingManager.getGlobalSize(), pagingManager.getGlobalMessages(), pagingManager.getMaxSize(), pagingManager.getMaxMessages()); + } + @Override public boolean startPaging() { if (!running) { @@ -575,7 +579,7 @@ public class PagingStoreImpl implements PagingStore { } } paging = true; - ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, getPageInfo()); return true; } finally { @@ -764,7 +768,7 @@ public class PagingStoreImpl implements PagingStore { if (pagingManager.isDiskFull()) { ActiveMQServerLogger.LOGGER.blockingDiskFull(address); } else { - ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, size.getSize(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo()); } blocking = true; } @@ -813,7 +817,7 @@ public class PagingStoreImpl implements PagingStore { if (!onMemoryFreedRunnables.isEmpty()) { executor.execute(this::memoryReleased); if (blocking) { - ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, size.getSize(), maxSize); + ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo()); blocking = false; return true; } @@ -848,7 +852,7 @@ public class PagingStoreImpl implements PagingStore { // Address is full, we just pretend we are paging, and drop the data if (!printedDropMessagesWarning) { printedDropMessagesWarning = true; - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, size.getSize(), maxSize, pagingManager.getGlobalSize()); + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); } return true; } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index cc1b880947..690187d91c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -288,8 +288,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void switchingNIO(); @LogMessage(level = Logger.Level.INFO) - @Message(id = 221046, value = "Unblocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT) - void unblockingMessageProduction(SimpleString addressName, long currentSize, long maxSize); + @Message(id = 221046, value = "Unblocking message production on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT) + void unblockingMessageProduction(SimpleString addressName, String sizeInfo); @LogMessage(level = Logger.Level.INFO) @Message(id = 221047, value = "Backup Server has scaled down to live server", format = Message.Format.MESSAGE_FORMAT) @@ -615,12 +615,12 @@ public interface ActiveMQServerLogger extends BasicLogger { void pageStoreStartIOError(@Cause Exception e); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222038, value = "Starting paging on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT) - void pageStoreStart(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize); + @Message(id = 222038, value = "Starting paging on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT) + void pageStoreStart(SimpleString storeName, String sizeInfo); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222039, value = "Messages sent to address ''{0}'' are being dropped; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT) - void pageStoreDropMessages(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize); + @Message(id = 222039, value = "Messages sent to address ''{0}'' are being dropped; {1}", format = Message.Format.MESSAGE_FORMAT) + void pageStoreDropMessages(SimpleString storeName, String sizeInfo); @LogMessage(level = Logger.Level.WARN) @Message(id = 222040, value = "Server is stopped", format = Message.Format.MESSAGE_FORMAT) @@ -1218,8 +1218,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void missingClusterConfigForScaleDown(String scaleDownCluster); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT) - void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize); + @Message(id = 222183, value = "Blocking message production on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT) + void blockingMessageProduction(SimpleString addressName, String pageInfo); @LogMessage(level = Logger.Level.WARN) @Message(id = 222184, @@ -2181,8 +2181,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void enableTraceForCriticalAnalyzer(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 224108, value = "Stopped paging on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}; global-size-bytes: {3}", format = Message.Format.MESSAGE_FORMAT) - void pageStoreStop(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize); + @Message(id = 224108, value = "Stopped paging on address ''{0}''; {1}", format = Message.Format.MESSAGE_FORMAT) + void pageStoreStop(SimpleString storeName, String pageInfo); @LogMessage(level = Logger.Level.WARN) @Message(id = 224109, value = "ConnectionRouter {0} not found", format = Message.Format.MESSAGE_FORMAT) diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 6d0b433bec..6edcf884ea 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -861,7 +861,6 @@ - test-compile create-paging @@ -882,6 +881,46 @@ + + test-compile + create-paging-address-messages + + create + + + + ${basedir}/target/classes/servers/pagingAddressMaxMessages + true + admin + admin + ${basedir}/target/pagingAddressMaxMessages + + + --java-options + -Djava.rmi.server.hostname=localhost + + + + + test-compile + create-paging-global-messages + + create + + + + ${basedir}/target/classes/servers/pagingGlobalMaxMessages + true + admin + admin + ${basedir}/target/pagingGlobalMaxMessages + + + --java-options + -Djava.rmi.server.hostname=localhost + + + test-compile diff --git a/tests/smoke-tests/src/main/resources/servers/pagingAddressMaxMessages/broker.xml b/tests/smoke-tests/src/main/resources/servers/pagingAddressMaxMessages/broker.xml new file mode 100644 index 0000000000..f7e2a48841 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/pagingAddressMaxMessages/broker.xml @@ -0,0 +1,183 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300;actorThresholdBytes=10000 + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300 + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 1000 + 10000 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/pagingGlobalMaxMessages/broker.xml b/tests/smoke-tests/src/main/resources/servers/pagingGlobalMaxMessages/broker.xml new file mode 100644 index 0000000000..4a369b4bf0 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/pagingGlobalMaxMessages/broker.xml @@ -0,0 +1,184 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + 1000 + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300;actorThresholdBytes=10000 + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300 + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10000 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/SmokeMaxMessagePagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/SmokeMaxMessagePagingTest.java new file mode 100644 index 0000000000..6bd04821b0 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/SmokeMaxMessagePagingTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import java.io.File; + +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.messages.Producer; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SmokeMaxMessagePagingTest extends SmokeTestBase { + + public static final String SERVER_NAME_GLOBAL = "pagingGlobalMaxMessages"; + public static final String SERVER_NAME_ADDRESS = "pagingAddressMaxMessages"; + + @Before + public void before() throws Exception { + } + + @Test + public void testGlobalMaxSend() throws Exception { + internalTestSend(SERVER_NAME_GLOBAL); + } + + @Test + public void testAddressMaxSend() throws Exception { + internalTestSend(SERVER_NAME_ADDRESS); + } + + public void internalTestSend(String serverName) throws Exception { + cleanupData(serverName); + startServer(serverName, 0, 30000); + internalSend("core", 2000); + Assert.assertTrue("System did not page", isPaging(serverName)); + } + + boolean isPaging(String serverName) { + File location = new File(getServerLocation(serverName)); + File paging = new File(location, "data/paging"); + File[] pagingContents = paging.listFiles(); + return pagingContents != null && pagingContents.length > 0; + } + + + @Test + public void testGlobalMaxSendRestart() throws Exception { + internalTestSendWithRestart(SERVER_NAME_GLOBAL); + } + + @Test + public void testAddressMaxSendRestart() throws Exception { + internalTestSendWithRestart(SERVER_NAME_ADDRESS); + } + + public void internalTestSendWithRestart(String serverName) throws Exception { + cleanupData(serverName); + Process process = startServer(serverName, 0, 30000); + internalSend("core", 500); + + Assert.assertFalse(isPaging(serverName)); + + process.destroy(); + process = startServer(serverName, 0, 30000); + internalSend("core", 1500); + + Assert.assertTrue(isPaging(serverName)); + } + + private void internalSend(String protocol, int numberOfMessages) throws Exception { + Producer producer = (Producer)new Producer().setMessageSize(1).setMessageCount(numberOfMessages).setTxBatchSize(500); + producer.setProtocol(protocol); + producer.setSilentInput(true); + producer.execute(new ActionContext()); + } + +}