ARTEMIS-2578 clarify storage capacity messages

This is a surprisingly large change just to fix some log messages, but
the changes were necessary in order to get the relevant data to where it
was being logged. The fact that the data wasn't readily available is
probably why it wasn't logged in the first place.
This commit is contained in:
Justin Bertram 2019-12-16 13:44:06 -06:00 committed by Clebert Suconic
parent d9a4d9238f
commit 2a452bd7a3
11 changed files with 141 additions and 65 deletions

View File

@ -42,6 +42,14 @@ public class ByteUtil {
private static final Pattern KILO = Pattern.compile(prefix + "k" + suffix, Pattern.CASE_INSENSITIVE);
private static final Pattern MEGA = Pattern.compile(prefix + "m" + suffix, Pattern.CASE_INSENSITIVE);
private static final Pattern GIGA = Pattern.compile(prefix + "g" + suffix, Pattern.CASE_INSENSITIVE);
private static final String[] BYTE_SUFFIXES = new String[] {"E", "P", "T", "G", "M", "K", ""};
private static final double[] BYTE_MAGNITUDES = new double[7];
static {
for (int i = 18, j = 0; i >= 0; i -= 3, j++) {
BYTE_MAGNITUDES[j] = Math.pow(10, i);
}
}
public static void debugFrame(Logger logger, String message, ByteBuf byteIn) {
if (logger.isTraceEnabled()) {
@ -442,4 +450,12 @@ public class ByteUtil {
}
}
public static String getHumanReadableByteCount(long bytes) {
int i = 0;
while (i < BYTE_MAGNITUDES.length && BYTE_MAGNITUDES[i] > bytes) {
i++;
}
return String.format("%.1f%sB", bytes / BYTE_MAGNITUDES[i], BYTE_SUFFIXES[i]);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class HumanReadableByteCountTest {
@Test
public void test() {
String[] suffixes = new String[] {"K", "M", "G", "T", "P", "E"};
assertEquals("999.0B", ByteUtil.getHumanReadableByteCount(999));
assertEquals("500.0B", ByteUtil.getHumanReadableByteCount(500));
for (int i = 0, j = 3; i < 6; i++, j += 3) {
final long magnitude = (long) Math.pow(10, j);
assertEquals("1.0" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude));
assertEquals("1.3" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.25 * magnitude)));
assertEquals("1.5" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.5 * magnitude)));
assertEquals("1.9" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.9 * magnitude)));
assertEquals("4.2" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (3.2 * magnitude)));
}
}
}

View File

@ -107,6 +107,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
boolean isDiskFull();
long getDiskUsableSpace();
long getDiskTotalSpace();
default long getGlobalSize() {
return 0;
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.file.FileStore;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -39,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
@ -77,6 +77,10 @@ public final class PagingManagerImpl implements PagingManager {
private volatile boolean diskFull = false;
private volatile long diskUsableSpace = 0;
private volatile long diskTotalSpace = 0;
private final Executor memoryExecutor;
private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
@ -194,24 +198,26 @@ public final class PagingManagerImpl implements PagingManager {
private final Logger logger = Logger.getLogger(LocalMonitor.class);
@Override
public void tick(FileStore store, double usage) {
logger.tracef("Tick from store:: %s, usage at %f", store, usage);
public void tick(long usableSpace, long totalSpace) {
diskUsableSpace = usableSpace;
diskTotalSpace = totalSpace;
logger.tracef("Tick:: usable space at %f, total space at %f", ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace));
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
if (!diskFull) {
ActiveMQServerLogger.LOGGER.diskBeyondCapacity();
ActiveMQServerLogger.LOGGER.diskBeyondCapacity(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
diskFull = true;
}
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
final boolean diskFull = PagingManagerImpl.this.diskFull;
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
if (diskFull) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored();
ActiveMQServerLogger.LOGGER.diskCapacityRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
PagingManagerImpl.this.diskFull = false;
}
checkMemoryRelease();
@ -224,6 +230,16 @@ public final class PagingManagerImpl implements PagingManager {
return diskFull;
}
@Override
public long getDiskUsableSpace() {
return diskUsableSpace;
}
@Override
public long getDiskTotalSpace() {
return diskTotalSpace;
}
@Override
public boolean isUsingGlobalSize() {
return maxSize > 0;

View File

@ -385,8 +385,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 229118, value = "Management method not applicable for current server configuration")
IllegalStateException methodNotApplicable();
@Message(id = 229119, value = "Disk Capacity is Low, cannot produce more messages.")
ActiveMQIOErrorException diskBeyondLimit();
@Message(id = 229119, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is beyond the configured <max-disk-usage>. System will start blocking producers.", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIOErrorException diskBeyondLimit(String usableSpace, String totalSpace, String usage);
@Message(id = 229120, value = "connection with ID {0} closed by management", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInternalErrorException connectionWithIDClosedByManagement(String ID);

View File

@ -1341,14 +1341,14 @@ public interface ActiveMQServerLogger extends BasicLogger {
void impossibleToRouteGrouped();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222210, value = "Storage usage is beyond max-disk-usage. System will start blocking producers.",
@Message(id = 222210, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is beyond the configured <max-disk-usage>. System will start blocking producers.",
format = Message.Format.MESSAGE_FORMAT)
void diskBeyondCapacity();
void diskBeyondCapacity(String usableSpace, String totalSpace, String usage);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.",
@Message(id = 222211, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is below the configured <max-disk-usage>.",
format = Message.Format.MESSAGE_FORMAT)
void diskCapacityRestored();
void diskCapacityRestored(String usableSpace, String totalSpace, String usage);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222212, value = "Disk Full! Blocking message production on address ''{0}''. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)

View File

@ -96,15 +96,14 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public void tick() {
synchronized (monitorLock) {
boolean over = false;
FileStore lastStore = null;
double usage = 0;
long usableSpace = 0;
long totalSpace = 0;
for (FileStore store : stores) {
try {
lastStore = store;
usage = calculateUsage(store);
over = usage > maxUsage;
usableSpace = store.getUsableSpace();
totalSpace = getTotalSpace(store);
over = calculateUsage(usableSpace, totalSpace) > maxUsage;
if (over) {
break;
}
@ -116,12 +115,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
}
for (Callback callback : callbackList) {
callback.tick(lastStore, usage);
callback.tick(usableSpace, totalSpace);
if (over) {
callback.over(lastStore, usage);
callback.over(usableSpace, totalSpace);
} else {
callback.under(lastStore, usage);
callback.under(usableSpace, totalSpace);
}
}
}
@ -136,12 +135,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
return this;
}
protected double calculateUsage(FileStore store) throws IOException {
return 1.0 - (double) store.getUsableSpace() / getTotalSpace(store);
public static double calculateUsage(long usableSpace, long totalSpace) {
return 1.0 - (double) usableSpace / (double) totalSpace;
}
private double getTotalSpace(FileStore store) throws IOException {
double totalSpace = (double) store.getTotalSpace();
private long getTotalSpace(FileStore store) throws IOException {
long totalSpace = store.getTotalSpace();
if (totalSpace < 0) {
totalSpace = Long.MAX_VALUE;
}
@ -150,11 +149,10 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public interface Callback {
void tick(FileStore store, double usage);
void tick(long usableSpace, long totalSpace);
void over(FileStore store, double usage);
void over(long usableSpace, long totalSpace);
void under(FileStore store, double usage);
void under(long usableSpace, long totalSpace);
}
}

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -92,6 +93,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
@ -1700,7 +1702,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
try {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
long usableSpace = pagingManager.getDiskUsableSpace();
long totalSpace = pagingManager.getDiskTotalSpace();
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
this.getRemotingConnection().fail(exception);
throw exception;
}

View File

@ -20,15 +20,12 @@ package org.apache.activemq.artemis.core.server.files;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.FileStore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -77,35 +74,24 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
public void tick(long usableSpace, long totalSpace) {
tick.incrementAndGet();
System.out.println("tick:: " + store + " usage::" + usage);
System.out.println("tick:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
over.incrementAndGet();
System.out.println("over:: " + store + " usage::" + usage);
System.out.println("over:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
under.incrementAndGet();
System.out.println("under:: " + store + " usage::" + usage);
}
};
final AtomicBoolean fakeReturn = new AtomicBoolean(false);
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) {
@Override
protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) {
return 1f;
} else {
return super.calculateUsage(store);
}
System.out.println("under:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
};
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null);
storeMonitor.addCallback(callback);
storeMonitor.addStore(getTestDirfile());
@ -115,7 +101,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
Assert.assertEquals(1, tick.get());
Assert.assertEquals(1, under.get());
fakeReturn.set(true);
storeMonitor.setMaxUsage(0);
storeMonitor.tick();
@ -133,18 +119,18 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
storeMonitor.addStore(getTestDirfile());
storeMonitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
System.out.println("TickS::" + usage);
public void tick(long usableSpace, long totalSpace) {
System.out.println("Tick");
latch.countDown();
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
}
});

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@ -28,11 +32,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.nio.file.FileStore;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class GlobalDiskFullTest extends AmqpClientTestSupport {
@Override
@ -48,15 +47,15 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
monitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
public void tick(long usableSpace, long totalSpace) {
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
latch.countDown();
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
}
});

View File

@ -120,6 +120,16 @@ public final class FakePagingManager implements PagingManager {
return false;
}
@Override
public long getDiskUsableSpace() {
return 0;
}
@Override
public long getDiskTotalSpace() {
return 0;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()