ARTEMIS-3057 Add min-disk-free feature
To check if the remaining disk is enough. Alternative to max-disk-usage.
This commit is contained in:
parent
85b8db94a2
commit
6ec2131e32
|
@ -524,6 +524,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
DEFAULT_MAX_DISK_USAGE = maxDisk;
|
||||
}
|
||||
|
||||
public static final long DEFAULT_MIN_DISK_FREE = -1;
|
||||
|
||||
public static final int DEFAULT_DISK_SCAN = 5000;
|
||||
|
||||
public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1;
|
||||
|
@ -1545,6 +1547,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_MAX_DISK_USAGE;
|
||||
}
|
||||
|
||||
public static long getDefaultMinDiskFree() {
|
||||
return DEFAULT_MIN_DISK_FREE;
|
||||
}
|
||||
|
||||
public static int getDefaultDiskScanPeriod() {
|
||||
return DEFAULT_DISK_SCAN;
|
||||
}
|
||||
|
|
|
@ -1274,6 +1274,10 @@ public interface Configuration {
|
|||
|
||||
Configuration setMaxDiskUsage(int maxDiskUsage);
|
||||
|
||||
long getMinDiskFree();
|
||||
|
||||
Configuration setMinDiskFree(long minDiskFree);
|
||||
|
||||
ConfigurationImpl setInternalNamingPrefix(String internalNamingPrefix);
|
||||
|
||||
Configuration setDiskScanPeriod(int diskScanPeriod);
|
||||
|
|
|
@ -379,6 +379,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
|
||||
|
||||
private long minDiskFree = ActiveMQDefaultConfiguration.getDefaultMinDiskFree();
|
||||
|
||||
private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
|
||||
|
||||
private String systemPropertyPrefix = ActiveMQDefaultConfiguration.getDefaultSystemPropertyPrefix();
|
||||
|
@ -906,6 +908,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinDiskFree() {
|
||||
return minDiskFree;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setMinDiskFree(long minDiskFree) {
|
||||
this.minDiskFree = minDiskFree;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setGlobalMaxSize(long maxSize) {
|
||||
this.globalMaxSize = maxSize;
|
||||
|
@ -2827,6 +2840,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
if (maxDiskUsage != other.maxDiskUsage) {
|
||||
return false;
|
||||
}
|
||||
if (minDiskFree != other.minDiskFree) {
|
||||
return false;
|
||||
}
|
||||
if (diskScanPeriod != other.diskScanPeriod) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -327,7 +327,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String GLOBAL_MAX_MESSAGES = "global-max-messages";
|
||||
|
||||
private static final String MAX_DISK_USAGE = "max-disk-usage";
|
||||
public static final String MAX_DISK_USAGE = "max-disk-usage";
|
||||
|
||||
public static final String MIN_DISK_FREE = "min-disk-free";
|
||||
|
||||
private static final String DISK_SCAN_PERIOD = "disk-scan-period";
|
||||
|
||||
|
@ -474,6 +476,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
config.setGlobalMaxMessages(globalMaxMessages);
|
||||
|
||||
config.setMinDiskFree(getTextBytesAsLongBytes(e, MIN_DISK_FREE, config.getMinDiskFree(), Validators.MINUS_ONE_OR_GT_ZERO));
|
||||
|
||||
config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE_OR_MINUS_ONE));
|
||||
|
||||
config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO));
|
||||
|
|
|
@ -51,6 +51,9 @@ import org.slf4j.LoggerFactory;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType;
|
||||
import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage;
|
||||
|
||||
public final class PagingManagerImpl implements PagingManager {
|
||||
|
||||
private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60"));
|
||||
|
@ -237,31 +240,34 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public void tick(long usableSpace, long totalSpace) {
|
||||
public void tick(long usableSpace, long totalSpace, boolean withinLimit, FileStoreMonitorType type) {
|
||||
diskUsableSpace = usableSpace;
|
||||
diskTotalSpace = totalSpace;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Tick:: usable space at {}, total space at {}", ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void over(long usableSpace, long totalSpace) {
|
||||
if (!diskFull) {
|
||||
ActiveMQServerLogger.LOGGER.diskBeyondCapacity(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
|
||||
diskFull = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void under(long usableSpace, long totalSpace) {
|
||||
final boolean diskFull = PagingManagerImpl.this.diskFull;
|
||||
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
|
||||
if (diskFull) {
|
||||
ActiveMQServerLogger.LOGGER.diskCapacityRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
|
||||
PagingManagerImpl.this.diskFull = false;
|
||||
if (withinLimit) {
|
||||
final boolean diskFull = PagingManagerImpl.this.diskFull;
|
||||
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
|
||||
if (diskFull) {
|
||||
if (type == MaxDiskUsage) {
|
||||
ActiveMQServerLogger.LOGGER.maxDiskUsageRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.minDiskFreeRestored(ByteUtil.getHumanReadableByteCount(usableSpace));
|
||||
}
|
||||
PagingManagerImpl.this.diskFull = false;
|
||||
}
|
||||
checkMemoryRelease();
|
||||
}
|
||||
} else {
|
||||
if (!diskFull) {
|
||||
if (type == MaxDiskUsage) {
|
||||
ActiveMQServerLogger.LOGGER.maxDiskUsageReached(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.minDiskFreeReached(ByteUtil.getHumanReadableByteCount(usableSpace));
|
||||
}
|
||||
diskFull = true;
|
||||
}
|
||||
checkMemoryRelease();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -295,7 +301,6 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
|
||||
@Override
|
||||
public void checkMemory(final Runnable runWhenAvailable) {
|
||||
|
||||
if (isGlobalFull()) {
|
||||
memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
|
||||
return;
|
||||
|
@ -321,7 +326,6 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isGlobalFull() {
|
||||
return diskFull || maxSize > 0 && globalFull;
|
||||
|
|
|
@ -932,10 +932,10 @@ public interface ActiveMQServerLogger {
|
|||
void impossibleToRouteGrouped();
|
||||
|
||||
@LogMessage(id = 222210, value = "Free storage space is at {} of {} total. Usage rate is {} which is beyond the configured <max-disk-usage>. System will start blocking producers.", level = LogMessage.Level.WARN)
|
||||
void diskBeyondCapacity(String usableSpace, String totalSpace, String usage);
|
||||
void maxDiskUsageReached(String usableSpace, String totalSpace, String usage);
|
||||
|
||||
@LogMessage(id = 222211, value = "Free storage space is at {} of {} total. Usage rate is {} which is below the configured <max-disk-usage>.", level = LogMessage.Level.INFO)
|
||||
void diskCapacityRestored(String usableSpace, String totalSpace, String usage);
|
||||
@LogMessage(id = 222211, value = "Free storage space is at {} of {} total. Usage rate is {} which is below the configured <max-disk-usage>. System will unblock producers.", level = LogMessage.Level.INFO)
|
||||
void maxDiskUsageRestored(String usableSpace, String totalSpace, String usage);
|
||||
|
||||
@LogMessage(id = 222212, value = "Disk Full! Blocking message production on address '{}'. Clients will report blocked.", level = LogMessage.Level.WARN)
|
||||
void blockingDiskFull(SimpleString addressName);
|
||||
|
@ -1584,4 +1584,13 @@ public interface ActiveMQServerLogger {
|
|||
void failureDuringProtocolHandshake(SocketAddress localAddress, SocketAddress remoteAddress, Throwable e);
|
||||
|
||||
// notice loggerID=224127 is reserved as it's been used at ActiveMQQueueLogger
|
||||
|
||||
@LogMessage(id = 224128, value = "Free storage space is at {} which is below the configured <min-disk-free>. System will start blocking producers.", level = LogMessage.Level.WARN)
|
||||
void minDiskFreeReached(String usableSpace);
|
||||
|
||||
@LogMessage(id = 224129, value = "Free storage space is at {} which is above the configured <min-disk-free>. System will unblock producers.", level = LogMessage.Level.WARN)
|
||||
void minDiskFreeRestored(String usableSpace);
|
||||
|
||||
@LogMessage(id = 224130, value = "The {} value {} will override the {} value {} since both are set.", level = LogMessage.Level.INFO)
|
||||
void configParamOverride(String overridingParam, Object overridingParamValue, String overriddenParam, Object overriddenParamValue);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.files;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.FileStore;
|
||||
import java.nio.file.Files;
|
||||
import java.util.HashSet;
|
||||
|
@ -30,7 +31,6 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
/**
|
||||
* This will keep a list of fileStores. It will make a comparison on all file stores registered. if any is over the limit,
|
||||
|
@ -45,18 +45,26 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
|
||||
private final Set<Callback> callbackList = new HashSet<>();
|
||||
private final Set<FileStore> stores = new HashSet<>();
|
||||
private double maxUsage;
|
||||
private Double maxUsage;
|
||||
private Long minDiskFree;
|
||||
private final Object monitorLock = new Object();
|
||||
private final IOCriticalErrorListener ioCriticalErrorListener;
|
||||
private final FileStoreMonitorType type;
|
||||
|
||||
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
|
||||
Executor executor,
|
||||
long checkPeriod,
|
||||
TimeUnit timeUnit,
|
||||
double maxUsage,
|
||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
Number referenceValue,
|
||||
IOCriticalErrorListener ioCriticalErrorListener,
|
||||
FileStoreMonitorType type) {
|
||||
super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
|
||||
this.maxUsage = maxUsage;
|
||||
this.type = type;
|
||||
if (type == FileStoreMonitorType.MaxDiskUsage) {
|
||||
this.maxUsage = referenceValue.doubleValue();
|
||||
} else {
|
||||
this.minDiskFree = referenceValue.longValue();
|
||||
}
|
||||
this.ioCriticalErrorListener = ioCriticalErrorListener;
|
||||
}
|
||||
|
||||
|
@ -96,7 +104,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
|
||||
public void tick() {
|
||||
synchronized (monitorLock) {
|
||||
boolean over = false;
|
||||
boolean ok = true;
|
||||
long usableSpace = 0;
|
||||
long totalSpace = 0;
|
||||
|
||||
|
@ -104,8 +112,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
try {
|
||||
usableSpace = store.getUsableSpace();
|
||||
totalSpace = getTotalSpace(store);
|
||||
over = calculateUsage(usableSpace, totalSpace) > maxUsage;
|
||||
if (over) {
|
||||
if (type == FileStoreMonitorType.MinDiskFree) {
|
||||
ok = usableSpace > minDiskFree;
|
||||
} else {
|
||||
ok = calculateUsage(usableSpace, totalSpace) < maxUsage;
|
||||
}
|
||||
if (!ok) {
|
||||
break;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
|
@ -116,13 +128,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
}
|
||||
|
||||
for (Callback callback : callbackList) {
|
||||
callback.tick(usableSpace, totalSpace);
|
||||
|
||||
if (over) {
|
||||
callback.over(usableSpace, totalSpace);
|
||||
} else {
|
||||
callback.under(usableSpace, totalSpace);
|
||||
}
|
||||
callback.tick(usableSpace, totalSpace, ok, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,13 +137,21 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
return maxUsage;
|
||||
}
|
||||
|
||||
public long getMinDiskFree() {
|
||||
return minDiskFree;
|
||||
}
|
||||
|
||||
public FileStoreMonitor setMaxUsage(double maxUsage) {
|
||||
this.maxUsage = maxUsage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static double calculateUsage(long usableSpace, long totalSpace) {
|
||||
public FileStoreMonitor setMinDiskFree(long minDiskFree) {
|
||||
this.minDiskFree = minDiskFree;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static double calculateUsage(long usableSpace, long totalSpace) {
|
||||
if (totalSpace == 0) {
|
||||
return 0.0;
|
||||
}
|
||||
|
@ -153,11 +167,10 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
}
|
||||
|
||||
public interface Callback {
|
||||
void tick(long usableSpace, long totalSpace, boolean withinLimit, FileStoreMonitorType type);
|
||||
}
|
||||
|
||||
void tick(long usableSpace, long totalSpace);
|
||||
|
||||
void over(long usableSpace, long totalSpace);
|
||||
|
||||
void under(long usableSpace, long totalSpace);
|
||||
public enum FileStoreMonitorType {
|
||||
MaxDiskUsage, MinDiskFree;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3510,12 +3510,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
recoverStoredBridges();
|
||||
}
|
||||
|
||||
if (configuration.getMaxDiskUsage() != -1) {
|
||||
try {
|
||||
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener));
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
|
||||
deployFileStoreMonitor();
|
||||
}
|
||||
|
||||
private void deployFileStoreMonitor() throws Exception {
|
||||
FileStoreMonitor.FileStoreMonitorType fileStoreMonitorType = null;
|
||||
Number referenceValue = null;
|
||||
if (configuration.getMinDiskFree() != -1) {
|
||||
if (configuration.getMaxDiskUsage() != -1) {
|
||||
ActiveMQServerLogger.LOGGER.configParamOverride(FileConfigurationParser.MIN_DISK_FREE, configuration.getMinDiskFree(), FileConfigurationParser.MAX_DISK_USAGE, configuration.getMaxDiskUsage());
|
||||
}
|
||||
fileStoreMonitorType = FileStoreMonitor.FileStoreMonitorType.MinDiskFree;
|
||||
referenceValue = configuration.getMinDiskFree();
|
||||
} else if (configuration.getMaxDiskUsage() != -1) {
|
||||
fileStoreMonitorType = FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage;
|
||||
referenceValue = configuration.getMaxDiskUsage() / 100d;
|
||||
}
|
||||
|
||||
if (fileStoreMonitorType != null) {
|
||||
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, referenceValue, ioCriticalErrorListener, fileStoreMonitorType));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3536,10 +3549,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
* This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests.
|
||||
*/
|
||||
public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception {
|
||||
this.fileStoreMonitor = storeMonitor;
|
||||
pagingManager.injectMonitor(storeMonitor);
|
||||
storageManager.injectMonitor(storeMonitor);
|
||||
fileStoreMonitor.start();
|
||||
try {
|
||||
this.fileStoreMonitor = storeMonitor;
|
||||
pagingManager.injectMonitor(storeMonitor);
|
||||
storageManager.injectMonitor(storeMonitor);
|
||||
fileStoreMonitor.start();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
|
||||
}
|
||||
}
|
||||
|
||||
public FileStoreMonitor getMonitor() {
|
||||
|
|
|
@ -801,7 +801,17 @@
|
|||
<xsd:element name="max-disk-usage" type="xsd:int" default="90" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Max percentage of disk usage before the system blocks or fails clients.
|
||||
Max percentage of disk usage before the system blocks or fails clients. Will be overrided by
|
||||
min-disk-free if both are set.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="min-disk-free" type="xsd:string" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Min free bytes on disk below which the system blocks or fails clients. Supports byte notation like
|
||||
"K", "Mb", "GB", etc. Will override max-disk-usage if both are set.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage;
|
||||
|
||||
public class FileStoreMonitorTest extends ActiveMQTestBase {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -58,85 +60,108 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleTick() throws Exception {
|
||||
public void testSimpleTickForMaxDiskUsage() throws Exception {
|
||||
File garbageFile = new File(getTestDirfile(), "garbage.bin");
|
||||
FileOutputStream garbage = new FileOutputStream(garbageFile);
|
||||
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage);
|
||||
PrintStream out = new PrintStream(bufferedOutputStream);
|
||||
|
||||
// This is just to make sure there is at least something on the device.
|
||||
// If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
out.println("Garbage " + i);
|
||||
try (FileOutputStream garbage = new FileOutputStream(garbageFile);
|
||||
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage);
|
||||
PrintStream out = new PrintStream(bufferedOutputStream)) {
|
||||
|
||||
// This is just to make sure there is at least something on the device.
|
||||
// If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
out.println("Garbage " + i);
|
||||
}
|
||||
}
|
||||
|
||||
bufferedOutputStream.close();
|
||||
|
||||
final AtomicInteger over = new AtomicInteger(0);
|
||||
final AtomicInteger under = new AtomicInteger(0);
|
||||
final AtomicInteger overMaxUsage = new AtomicInteger(0);
|
||||
final AtomicInteger underMaxUsage = new AtomicInteger(0);
|
||||
final AtomicInteger tick = new AtomicInteger(0);
|
||||
|
||||
FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() {
|
||||
@Override
|
||||
public void tick(long usableSpace, long totalSpace) {
|
||||
tick.incrementAndGet();
|
||||
logger.debug("tick:: usableSpace: {}, totalSpace:{}", usableSpace, totalSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void over(long usableSpace, long totalSpace) {
|
||||
over.incrementAndGet();
|
||||
logger.debug("over:: usableSpace: {}, totalSpace:{}", usableSpace, totalSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void under(long usableSpace, long totalSpace) {
|
||||
under.incrementAndGet();
|
||||
logger.debug("under:: usableSpace: {}, totalSpace: {}", usableSpace, totalSpace);
|
||||
FileStoreMonitor.Callback callback = (usableSpace, totalSpace, ok, type) -> {
|
||||
tick.incrementAndGet();
|
||||
if (ok) {
|
||||
underMaxUsage.incrementAndGet();
|
||||
} else {
|
||||
overMaxUsage.incrementAndGet();
|
||||
}
|
||||
logger.debug("tick:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
|
||||
};
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null);
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100L, TimeUnit.MILLISECONDS, 0.999, null, MaxDiskUsage);
|
||||
storeMonitor.addCallback(callback);
|
||||
storeMonitor.addStore(getTestDirfile());
|
||||
|
||||
storeMonitor.tick();
|
||||
|
||||
Assert.assertEquals(0, over.get());
|
||||
Assert.assertEquals(0, overMaxUsage.get());
|
||||
Assert.assertEquals(1, tick.get());
|
||||
Assert.assertEquals(1, under.get());
|
||||
Assert.assertEquals(1, underMaxUsage.get());
|
||||
|
||||
storeMonitor.setMaxUsage(0);
|
||||
|
||||
storeMonitor.tick();
|
||||
|
||||
Assert.assertEquals(1, over.get());
|
||||
Assert.assertEquals(1, overMaxUsage.get());
|
||||
Assert.assertEquals(2, tick.get());
|
||||
Assert.assertEquals(1, under.get());
|
||||
Assert.assertEquals(1, underMaxUsage.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduler() throws Exception {
|
||||
public void testSimpleTickForMinDiskFree() throws Exception {
|
||||
File garbageFile = new File(getTestDirfile(), "garbage.bin");
|
||||
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null);
|
||||
try (FileOutputStream garbage = new FileOutputStream(garbageFile);
|
||||
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage);
|
||||
PrintStream out = new PrintStream(bufferedOutputStream)) {
|
||||
|
||||
// This is just to make sure there is at least something on the device.
|
||||
// If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
out.println("Garbage " + i);
|
||||
}
|
||||
}
|
||||
|
||||
final AtomicInteger underMinDiskFree = new AtomicInteger(0);
|
||||
final AtomicInteger overMinDiskFree = new AtomicInteger(0);
|
||||
final AtomicInteger tick = new AtomicInteger(0);
|
||||
|
||||
FileStoreMonitor.Callback callback = (usableSpace, totalSpace, ok, type) -> {
|
||||
tick.incrementAndGet();
|
||||
if (ok) {
|
||||
overMinDiskFree.incrementAndGet();
|
||||
} else {
|
||||
underMinDiskFree.incrementAndGet();
|
||||
}
|
||||
};
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100L, TimeUnit.MILLISECONDS, 0, null, FileStoreMonitor.FileStoreMonitorType.MinDiskFree);
|
||||
storeMonitor.addCallback(callback);
|
||||
storeMonitor.addStore(getTestDirfile());
|
||||
|
||||
storeMonitor.tick();
|
||||
|
||||
Assert.assertEquals(0, underMinDiskFree.get());
|
||||
Assert.assertEquals(1, tick.get());
|
||||
Assert.assertEquals(1, overMinDiskFree.get());
|
||||
|
||||
storeMonitor.setMinDiskFree(Long.MAX_VALUE);
|
||||
|
||||
storeMonitor.tick();
|
||||
|
||||
Assert.assertEquals(1, underMinDiskFree.get());
|
||||
Assert.assertEquals(2, tick.get());
|
||||
Assert.assertEquals(1, overMinDiskFree.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulerForMaxDiskUsage() throws Exception {
|
||||
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null, MaxDiskUsage);
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(5);
|
||||
storeMonitor.addStore(getTestDirfile());
|
||||
storeMonitor.addCallback(new FileStoreMonitor.Callback() {
|
||||
@Override
|
||||
public void tick(long usableSpace, long totalSpace) {
|
||||
logger.debug("Tick");
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void over(long usableSpace, long totalSpace) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void under(long usableSpace, long totalSpace) {
|
||||
|
||||
}
|
||||
storeMonitor.addCallback((usableSpace, totalSpace, ok, type) -> {
|
||||
logger.debug("Tick");
|
||||
latch.countDown();
|
||||
});
|
||||
storeMonitor.start();
|
||||
|
||||
|
@ -148,7 +173,28 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
|
|||
|
||||
Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
// FileStoreMonitor monitor = new FileStoreMonitor()
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulerForMinDiskFree() throws Exception {
|
||||
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 500000000, null, FileStoreMonitor.FileStoreMonitorType.MinDiskFree);
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(5);
|
||||
storeMonitor.addStore(getTestDirfile());
|
||||
storeMonitor.addCallback((usableSpace, totalSpace, ok, type) -> {
|
||||
logger.debug("Tick");
|
||||
latch.countDown();
|
||||
});
|
||||
storeMonitor.start();
|
||||
|
||||
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
storeMonitor.stop();
|
||||
|
||||
latch.setCount(1);
|
||||
|
||||
Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,6 +103,8 @@ under the License.
|
|||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
<!-- when min-disk-free is set, it will override max-disk-usage -->
|
||||
<min-disk-free>1GB</min-disk-free>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
|
||||
<global-max-size>1234567</global-max-size>
|
||||
<max-disk-usage>37</max-disk-usage>
|
||||
<min-disk-free>500Mb</min-disk-free>
|
||||
<disk-scan-period>123</disk-scan-period>
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
<critical-analyzer-check-period>333</critical-analyzer-check-period>
|
||||
|
|
|
@ -196,12 +196,28 @@ For example:
|
|||
In this example all the other 9 queues will be consuming messages from the page system.
|
||||
This may cause performance issues if this is an undesirable state.
|
||||
|
||||
== Max Disk Usage
|
||||
== Monitoring Disk
|
||||
|
||||
The broker can be configured to perform scans on the disk to determine if disk is beyond a configured limit.
|
||||
Since the disk is a critical piece of infrastructure for data integrity the broker will automatically shut itself down if it runs out of disk space.
|
||||
Configuring a limit allows the broker to enforce flow control on clients sending messages to the broker so that the disk never fills up completely.
|
||||
|
||||
WARNING: If the protocol used to send the messages doesn't support flow control (e.g. STOMP) then an exception will be thrown and the connection for the client will be dropped so that it can no longer send messages and consume disk space.
|
||||
|
||||
=== Max Disk Usage
|
||||
|
||||
A limit on the _maximum_ disk space used can be configured through `max-disk-usage`
|
||||
This is the *percentage* of disk used.
|
||||
For example, if the disk's capacity was 500GiB and `max-disk-usage` was `50` then the broker would start blocking producers once 250GiB of disk space was used.
|
||||
|
||||
=== Minimum Disk Free
|
||||
|
||||
A limit on the _minimum_ disk space free can be configured through `min-disk-free`
|
||||
This is specific amount and not a percentage like with `max-disk-usage`.
|
||||
For example, if the disk's capacity was 500GiB and `min-disk-free` was `100GiB` then the broker would start blocking producers once 400GiB of disk space was used.
|
||||
|
||||
NOTE: If _both_ `max-disk-usage` and `min-disk-free` are configured then `min-disk-free` will take priority.
|
||||
|
||||
The System will perform scans on the disk to determine if the disk is beyond a configured limit.
|
||||
These are configured through `max-disk-usage` in percentage.
|
||||
Once that limit is reached any message will be blocked.
|
||||
(unless the protocol doesn't support flow control on which case there will be an exception thrown and the connection for those clients dropped).
|
||||
|
||||
== Page Sync Timeout
|
||||
|
||||
|
|
|
@ -44,19 +44,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
|
|||
public void testProducerOnDiskFull() throws Exception {
|
||||
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
monitor.addCallback(new FileStoreMonitor.Callback() {
|
||||
|
||||
@Override
|
||||
public void tick(long usableSpace, long totalSpace) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void over(long usableSpace, long totalSpace) {
|
||||
latch.countDown();
|
||||
}
|
||||
@Override
|
||||
public void under(long usableSpace, long totalSpace) {
|
||||
}
|
||||
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
|
||||
|
|
Loading…
Reference in New Issue