From 6ec2131e32d67ae9d8ab6ab15e2bda1777f82ca4 Mon Sep 17 00:00:00 2001 From: haanhvu Date: Sun, 14 Nov 2021 00:13:19 +0700 Subject: [PATCH] ARTEMIS-3057 Add min-disk-free feature To check if the remaining disk is enough. Alternative to max-disk-usage. --- .../config/ActiveMQDefaultConfiguration.java | 6 + .../artemis/core/config/Configuration.java | 4 + .../core/config/impl/ConfigurationImpl.java | 16 ++ .../impl/FileConfigurationParser.java | 6 +- .../core/paging/impl/PagingManagerImpl.java | 46 +++--- .../core/server/ActiveMQServerLogger.java | 15 +- .../core/server/files/FileStoreMonitor.java | 55 ++++--- .../core/server/impl/ActiveMQServerImpl.java | 35 ++-- .../schema/artemis-configuration.xsd | 12 +- .../server/files/FileStoreMonitorTest.java | 152 ++++++++++++------ ...gurationTest-full-config-wrong-address.xml | 2 + .../ConfigurationTest-full-config.xml | 1 + docs/user-manual/paging.adoc | 26 ++- .../integration/amqp/GlobalDiskFullTest.java | 15 +- 14 files changed, 264 insertions(+), 127 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index b7193d1988..b64b654d23 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -524,6 +524,8 @@ public final class ActiveMQDefaultConfiguration { DEFAULT_MAX_DISK_USAGE = maxDisk; } + public static final long DEFAULT_MIN_DISK_FREE = -1; + public static final int DEFAULT_DISK_SCAN = 5000; public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1; @@ -1545,6 +1547,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_MAX_DISK_USAGE; } + public static long getDefaultMinDiskFree() { + return DEFAULT_MIN_DISK_FREE; + } + public static int getDefaultDiskScanPeriod() { return DEFAULT_DISK_SCAN; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index cc045f2f0d..686764a66c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1274,6 +1274,10 @@ public interface Configuration { Configuration setMaxDiskUsage(int maxDiskUsage); + long getMinDiskFree(); + + Configuration setMinDiskFree(long minDiskFree); + ConfigurationImpl setInternalNamingPrefix(String internalNamingPrefix); Configuration setDiskScanPeriod(int diskScanPeriod); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8e91149e02..dedf624054 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -379,6 +379,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage(); + private long minDiskFree = ActiveMQDefaultConfiguration.getDefaultMinDiskFree(); + private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod(); private String systemPropertyPrefix = ActiveMQDefaultConfiguration.getDefaultSystemPropertyPrefix(); @@ -906,6 +908,17 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public long getMinDiskFree() { + return minDiskFree; + } + + @Override + public ConfigurationImpl setMinDiskFree(long minDiskFree) { + this.minDiskFree = minDiskFree; + return this; + } + @Override public ConfigurationImpl setGlobalMaxSize(long maxSize) { this.globalMaxSize = maxSize; @@ -2827,6 +2840,9 @@ public class ConfigurationImpl implements Configuration, Serializable { if (maxDiskUsage != other.maxDiskUsage) { return false; } + if (minDiskFree != other.minDiskFree) { + return false; + } if (diskScanPeriod != other.diskScanPeriod) { return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index b4c05b6edc..702d22cafd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -327,7 +327,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String GLOBAL_MAX_MESSAGES = "global-max-messages"; - private static final String MAX_DISK_USAGE = "max-disk-usage"; + public static final String MAX_DISK_USAGE = "max-disk-usage"; + + public static final String MIN_DISK_FREE = "min-disk-free"; private static final String DISK_SCAN_PERIOD = "disk-scan-period"; @@ -474,6 +476,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setGlobalMaxMessages(globalMaxMessages); + config.setMinDiskFree(getTextBytesAsLongBytes(e, MIN_DISK_FREE, config.getMinDiskFree(), Validators.MINUS_ONE_OR_GT_ZERO)); + config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE)); config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 63b02fe92f..41773760be 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -51,6 +51,9 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.function.BiConsumer; +import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType; +import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage; + public final class PagingManagerImpl implements PagingManager { private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60")); @@ -237,31 +240,34 @@ public final class PagingManagerImpl implements PagingManager { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override - public void tick(long usableSpace, long totalSpace) { + public void tick(long usableSpace, long totalSpace, boolean withinLimit, FileStoreMonitorType type) { diskUsableSpace = usableSpace; diskTotalSpace = totalSpace; if (logger.isTraceEnabled()) { logger.trace("Tick:: usable space at {}, total space at {}", ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace)); } - } - - @Override - public void over(long usableSpace, long totalSpace) { - if (!diskFull) { - ActiveMQServerLogger.LOGGER.diskBeyondCapacity(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100)); - diskFull = true; - } - } - - @Override - public void under(long usableSpace, long totalSpace) { - final boolean diskFull = PagingManagerImpl.this.diskFull; - if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) { - if (diskFull) { - ActiveMQServerLogger.LOGGER.diskCapacityRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100)); - PagingManagerImpl.this.diskFull = false; + if (withinLimit) { + final boolean diskFull = PagingManagerImpl.this.diskFull; + if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) { + if (diskFull) { + if (type == MaxDiskUsage) { + ActiveMQServerLogger.LOGGER.maxDiskUsageRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100)); + } else { + ActiveMQServerLogger.LOGGER.minDiskFreeRestored(ByteUtil.getHumanReadableByteCount(usableSpace)); + } + PagingManagerImpl.this.diskFull = false; + } + checkMemoryRelease(); + } + } else { + if (!diskFull) { + if (type == MaxDiskUsage) { + ActiveMQServerLogger.LOGGER.maxDiskUsageReached(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100)); + } else { + ActiveMQServerLogger.LOGGER.minDiskFreeReached(ByteUtil.getHumanReadableByteCount(usableSpace)); + } + diskFull = true; } - checkMemoryRelease(); } } } @@ -295,7 +301,6 @@ public final class PagingManagerImpl implements PagingManager { @Override public void checkMemory(final Runnable runWhenAvailable) { - if (isGlobalFull()) { memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable)); return; @@ -321,7 +326,6 @@ public final class PagingManagerImpl implements PagingManager { } - @Override public boolean isGlobalFull() { return diskFull || maxSize > 0 && globalFull; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 2d72e4222f..51b1580dac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -932,10 +932,10 @@ public interface ActiveMQServerLogger { void impossibleToRouteGrouped(); @LogMessage(id = 222210, value = "Free storage space is at {} of {} total. Usage rate is {} which is beyond the configured . System will start blocking producers.", level = LogMessage.Level.WARN) - void diskBeyondCapacity(String usableSpace, String totalSpace, String usage); + void maxDiskUsageReached(String usableSpace, String totalSpace, String usage); - @LogMessage(id = 222211, value = "Free storage space is at {} of {} total. Usage rate is {} which is below the configured .", level = LogMessage.Level.INFO) - void diskCapacityRestored(String usableSpace, String totalSpace, String usage); + @LogMessage(id = 222211, value = "Free storage space is at {} of {} total. Usage rate is {} which is below the configured . System will unblock producers.", level = LogMessage.Level.INFO) + void maxDiskUsageRestored(String usableSpace, String totalSpace, String usage); @LogMessage(id = 222212, value = "Disk Full! Blocking message production on address '{}'. Clients will report blocked.", level = LogMessage.Level.WARN) void blockingDiskFull(SimpleString addressName); @@ -1584,4 +1584,13 @@ public interface ActiveMQServerLogger { void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e); // notice loggerID=224127 is reserved as it's been used at ActiveMQQueueLogger + + @LogMessage(id = 224128, value = "Free storage space is at {} which is below the configured . System will start blocking producers.", level = LogMessage.Level.WARN) + void minDiskFreeReached(String usableSpace); + + @LogMessage(id = 224129, value = "Free storage space is at {} which is above the configured . System will unblock producers.", level = LogMessage.Level.WARN) + void minDiskFreeRestored(String usableSpace); + + @LogMessage(id = 224130, value = "The {} value {} will override the {} value {} since both are set.", level = LogMessage.Level.INFO) + void configParamOverride(String overridingParam, Object overridingParamValue, String overriddenParam, Object overriddenParamValue); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index 9a58dbed67..273304f3bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.files; import java.io.File; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.file.FileStore; import java.nio.file.Files; import java.util.HashSet; @@ -30,7 +31,6 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * This will keep a list of fileStores. It will make a comparison on all file stores registered. if any is over the limit, @@ -45,18 +45,26 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { private final Set callbackList = new HashSet<>(); private final Set stores = new HashSet<>(); - private double maxUsage; + private Double maxUsage; + private Long minDiskFree; private final Object monitorLock = new Object(); private final IOCriticalErrorListener ioCriticalErrorListener; + private final FileStoreMonitorType type; public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod, TimeUnit timeUnit, - double maxUsage, - IOCriticalErrorListener ioCriticalErrorListener) { + Number referenceValue, + IOCriticalErrorListener ioCriticalErrorListener, + FileStoreMonitorType type) { super(scheduledExecutorService, executor, checkPeriod, timeUnit, false); - this.maxUsage = maxUsage; + this.type = type; + if (type == FileStoreMonitorType.MaxDiskUsage) { + this.maxUsage = referenceValue.doubleValue(); + } else { + this.minDiskFree = referenceValue.longValue(); + } this.ioCriticalErrorListener = ioCriticalErrorListener; } @@ -96,7 +104,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { public void tick() { synchronized (monitorLock) { - boolean over = false; + boolean ok = true; long usableSpace = 0; long totalSpace = 0; @@ -104,8 +112,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { try { usableSpace = store.getUsableSpace(); totalSpace = getTotalSpace(store); - over = calculateUsage(usableSpace, totalSpace) > maxUsage; - if (over) { + if (type == FileStoreMonitorType.MinDiskFree) { + ok = usableSpace > minDiskFree; + } else { + ok = calculateUsage(usableSpace, totalSpace) < maxUsage; + } + if (!ok) { break; } } catch (IOException ioe) { @@ -116,13 +128,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { } for (Callback callback : callbackList) { - callback.tick(usableSpace, totalSpace); - - if (over) { - callback.over(usableSpace, totalSpace); - } else { - callback.under(usableSpace, totalSpace); - } + callback.tick(usableSpace, totalSpace, ok, type); } } } @@ -131,13 +137,21 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { return maxUsage; } + public long getMinDiskFree() { + return minDiskFree; + } + public FileStoreMonitor setMaxUsage(double maxUsage) { this.maxUsage = maxUsage; return this; } - public static double calculateUsage(long usableSpace, long totalSpace) { + public FileStoreMonitor setMinDiskFree(long minDiskFree) { + this.minDiskFree = minDiskFree; + return this; + } + public static double calculateUsage(long usableSpace, long totalSpace) { if (totalSpace == 0) { return 0.0; } @@ -153,11 +167,10 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { } public interface Callback { + void tick(long usableSpace, long totalSpace, boolean withinLimit, FileStoreMonitorType type); + } - void tick(long usableSpace, long totalSpace); - - void over(long usableSpace, long totalSpace); - - void under(long usableSpace, long totalSpace); + public enum FileStoreMonitorType { + MaxDiskUsage, MinDiskFree; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index efc3fe2d32..91a2754de3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3510,12 +3510,25 @@ public class ActiveMQServerImpl implements ActiveMQServer { recoverStoredBridges(); } - if (configuration.getMaxDiskUsage() != -1) { - try { - injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener)); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e); + deployFileStoreMonitor(); + } + + private void deployFileStoreMonitor() throws Exception { + FileStoreMonitor.FileStoreMonitorType fileStoreMonitorType = null; + Number referenceValue = null; + if (configuration.getMinDiskFree() != -1) { + if (configuration.getMaxDiskUsage() != -1) { + ActiveMQServerLogger.LOGGER.configParamOverride(FileConfigurationParser.MIN_DISK_FREE, configuration.getMinDiskFree(), FileConfigurationParser.MAX_DISK_USAGE, configuration.getMaxDiskUsage()); } + fileStoreMonitorType = FileStoreMonitor.FileStoreMonitorType.MinDiskFree; + referenceValue = configuration.getMinDiskFree(); + } else if (configuration.getMaxDiskUsage() != -1) { + fileStoreMonitorType = FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage; + referenceValue = configuration.getMaxDiskUsage() / 100d; + } + + if (fileStoreMonitorType != null) { + injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, referenceValue, ioCriticalErrorListener, fileStoreMonitorType)); } } @@ -3536,10 +3549,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { * This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */ public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception { - this.fileStoreMonitor = storeMonitor; - pagingManager.injectMonitor(storeMonitor); - storageManager.injectMonitor(storeMonitor); - fileStoreMonitor.start(); + try { + this.fileStoreMonitor = storeMonitor; + pagingManager.injectMonitor(storeMonitor); + storageManager.injectMonitor(storeMonitor); + fileStoreMonitor.start(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e); + } } public FileStoreMonitor getMonitor() { diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 58274341dc..0dc0a4cae3 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -801,7 +801,17 @@ - Max percentage of disk usage before the system blocks or fails clients. + Max percentage of disk usage before the system blocks or fails clients. Will be overrided by + min-disk-free if both are set. + + + + + + + + Min free bytes on disk below which the system blocks or fails clients. Supports byte notation like + "K", "Mb", "GB", etc. Will override max-disk-usage if both are set. diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java index a272d43c51..aceb624fae 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java @@ -37,6 +37,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage; + public class FileStoreMonitorTest extends ActiveMQTestBase { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -58,85 +60,108 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { } @Test - public void testSimpleTick() throws Exception { + public void testSimpleTickForMaxDiskUsage() throws Exception { File garbageFile = new File(getTestDirfile(), "garbage.bin"); - FileOutputStream garbage = new FileOutputStream(garbageFile); - BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage); - PrintStream out = new PrintStream(bufferedOutputStream); - // This is just to make sure there is at least something on the device. - // If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail. - for (int i = 0; i < 100; i++) { - out.println("Garbage " + i); + try (FileOutputStream garbage = new FileOutputStream(garbageFile); + BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage); + PrintStream out = new PrintStream(bufferedOutputStream)) { + + // This is just to make sure there is at least something on the device. + // If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail. + for (int i = 0; i < 100; i++) { + out.println("Garbage " + i); + } } - bufferedOutputStream.close(); - - final AtomicInteger over = new AtomicInteger(0); - final AtomicInteger under = new AtomicInteger(0); + final AtomicInteger overMaxUsage = new AtomicInteger(0); + final AtomicInteger underMaxUsage = new AtomicInteger(0); final AtomicInteger tick = new AtomicInteger(0); - FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() { - @Override - public void tick(long usableSpace, long totalSpace) { - tick.incrementAndGet(); - logger.debug("tick:: usableSpace: {}, totalSpace:{}", usableSpace, totalSpace); - } - - @Override - public void over(long usableSpace, long totalSpace) { - over.incrementAndGet(); - logger.debug("over:: usableSpace: {}, totalSpace:{}", usableSpace, totalSpace); - } - - @Override - public void under(long usableSpace, long totalSpace) { - under.incrementAndGet(); - logger.debug("under:: usableSpace: {}, totalSpace: {}", usableSpace, totalSpace); + FileStoreMonitor.Callback callback = (usableSpace, totalSpace, ok, type) -> { + tick.incrementAndGet(); + if (ok) { + underMaxUsage.incrementAndGet(); + } else { + overMaxUsage.incrementAndGet(); } + logger.debug("tick:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace); }; - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null); + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100L, TimeUnit.MILLISECONDS, 0.999, null, MaxDiskUsage); storeMonitor.addCallback(callback); storeMonitor.addStore(getTestDirfile()); storeMonitor.tick(); - Assert.assertEquals(0, over.get()); + Assert.assertEquals(0, overMaxUsage.get()); Assert.assertEquals(1, tick.get()); - Assert.assertEquals(1, under.get()); + Assert.assertEquals(1, underMaxUsage.get()); storeMonitor.setMaxUsage(0); storeMonitor.tick(); - Assert.assertEquals(1, over.get()); + Assert.assertEquals(1, overMaxUsage.get()); Assert.assertEquals(2, tick.get()); - Assert.assertEquals(1, under.get()); + Assert.assertEquals(1, underMaxUsage.get()); } @Test - public void testScheduler() throws Exception { + public void testSimpleTickForMinDiskFree() throws Exception { + File garbageFile = new File(getTestDirfile(), "garbage.bin"); - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null); + try (FileOutputStream garbage = new FileOutputStream(garbageFile); + BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage); + PrintStream out = new PrintStream(bufferedOutputStream)) { + + // This is just to make sure there is at least something on the device. + // If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail. + for (int i = 0; i < 100; i++) { + out.println("Garbage " + i); + } + } + + final AtomicInteger underMinDiskFree = new AtomicInteger(0); + final AtomicInteger overMinDiskFree = new AtomicInteger(0); + final AtomicInteger tick = new AtomicInteger(0); + + FileStoreMonitor.Callback callback = (usableSpace, totalSpace, ok, type) -> { + tick.incrementAndGet(); + if (ok) { + overMinDiskFree.incrementAndGet(); + } else { + underMinDiskFree.incrementAndGet(); + } + }; + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100L, TimeUnit.MILLISECONDS, 0, null, FileStoreMonitor.FileStoreMonitorType.MinDiskFree); + storeMonitor.addCallback(callback); + storeMonitor.addStore(getTestDirfile()); + + storeMonitor.tick(); + + Assert.assertEquals(0, underMinDiskFree.get()); + Assert.assertEquals(1, tick.get()); + Assert.assertEquals(1, overMinDiskFree.get()); + + storeMonitor.setMinDiskFree(Long.MAX_VALUE); + + storeMonitor.tick(); + + Assert.assertEquals(1, underMinDiskFree.get()); + Assert.assertEquals(2, tick.get()); + Assert.assertEquals(1, overMinDiskFree.get()); + } + + @Test + public void testSchedulerForMaxDiskUsage() throws Exception { + + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null, MaxDiskUsage); final ReusableLatch latch = new ReusableLatch(5); storeMonitor.addStore(getTestDirfile()); - storeMonitor.addCallback(new FileStoreMonitor.Callback() { - @Override - public void tick(long usableSpace, long totalSpace) { - logger.debug("Tick"); - latch.countDown(); - } - - @Override - public void over(long usableSpace, long totalSpace) { - - } - - @Override - public void under(long usableSpace, long totalSpace) { - - } + storeMonitor.addCallback((usableSpace, totalSpace, ok, type) -> { + logger.debug("Tick"); + latch.countDown(); }); storeMonitor.start(); @@ -148,7 +173,28 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); - // FileStoreMonitor monitor = new FileStoreMonitor() + } + + @Test + public void testSchedulerForMinDiskFree() throws Exception { + + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 500000000, null, FileStoreMonitor.FileStoreMonitorType.MinDiskFree); + + final ReusableLatch latch = new ReusableLatch(5); + storeMonitor.addStore(getTestDirfile()); + storeMonitor.addCallback((usableSpace, totalSpace, ok, type) -> { + logger.debug("Tick"); + latch.countDown(); + }); + storeMonitor.start(); + + Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); + + storeMonitor.stop(); + + latch.setCount(1); + + Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); } } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml index 54e1f3ba65..408f10b0c4 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml @@ -103,6 +103,8 @@ under the License. 90 + + 1GB true diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 3fc1e98c26..70145ae647 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -62,6 +62,7 @@ TEMP 1234567 37 + 500Mb 123 HALT 333 diff --git a/docs/user-manual/paging.adoc b/docs/user-manual/paging.adoc index ac817ec6e5..74e2ee53b3 100644 --- a/docs/user-manual/paging.adoc +++ b/docs/user-manual/paging.adoc @@ -196,12 +196,28 @@ For example: In this example all the other 9 queues will be consuming messages from the page system. This may cause performance issues if this is an undesirable state. -== Max Disk Usage +== Monitoring Disk + +The broker can be configured to perform scans on the disk to determine if disk is beyond a configured limit. +Since the disk is a critical piece of infrastructure for data integrity the broker will automatically shut itself down if it runs out of disk space. +Configuring a limit allows the broker to enforce flow control on clients sending messages to the broker so that the disk never fills up completely. + +WARNING: If the protocol used to send the messages doesn't support flow control (e.g. STOMP) then an exception will be thrown and the connection for the client will be dropped so that it can no longer send messages and consume disk space. + +=== Max Disk Usage + +A limit on the _maximum_ disk space used can be configured through `max-disk-usage` +This is the *percentage* of disk used. +For example, if the disk's capacity was 500GiB and `max-disk-usage` was `50` then the broker would start blocking producers once 250GiB of disk space was used. + +=== Minimum Disk Free + +A limit on the _minimum_ disk space free can be configured through `min-disk-free` +This is specific amount and not a percentage like with `max-disk-usage`. +For example, if the disk's capacity was 500GiB and `min-disk-free` was `100GiB` then the broker would start blocking producers once 400GiB of disk space was used. + +NOTE: If _both_ `max-disk-usage` and `min-disk-free` are configured then `min-disk-free` will take priority. -The System will perform scans on the disk to determine if the disk is beyond a configured limit. -These are configured through `max-disk-usage` in percentage. -Once that limit is reached any message will be blocked. -(unless the protocol doesn't support flow control on which case there will be an exception thrown and the connection for those clients dropped). == Page Sync Timeout diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java index 0eab768d6e..5c7631e6c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -44,19 +44,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport { public void testProducerOnDiskFull() throws Exception { FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); final CountDownLatch latch = new CountDownLatch(1); - monitor.addCallback(new FileStoreMonitor.Callback() { - - @Override - public void tick(long usableSpace, long totalSpace) { - } - - @Override - public void over(long usableSpace, long totalSpace) { - latch.countDown(); - } - @Override - public void under(long usableSpace, long totalSpace) { - } + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { + latch.countDown(); }); Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));