This commit is contained in:
Clebert Suconic 2019-12-17 20:27:11 -05:00
commit d2cf238217
11 changed files with 141 additions and 65 deletions

View File

@ -42,6 +42,14 @@ public class ByteUtil {
private static final Pattern KILO = Pattern.compile(prefix + "k" + suffix, Pattern.CASE_INSENSITIVE);
private static final Pattern MEGA = Pattern.compile(prefix + "m" + suffix, Pattern.CASE_INSENSITIVE);
private static final Pattern GIGA = Pattern.compile(prefix + "g" + suffix, Pattern.CASE_INSENSITIVE);
private static final String[] BYTE_SUFFIXES = new String[] {"E", "P", "T", "G", "M", "K", ""};
private static final double[] BYTE_MAGNITUDES = new double[7];
static {
for (int i = 18, j = 0; i >= 0; i -= 3, j++) {
BYTE_MAGNITUDES[j] = Math.pow(10, i);
}
}
public static void debugFrame(Logger logger, String message, ByteBuf byteIn) {
if (logger.isTraceEnabled()) {
@ -442,4 +450,12 @@ public class ByteUtil {
}
}
public static String getHumanReadableByteCount(long bytes) {
int i = 0;
while (i < BYTE_MAGNITUDES.length && BYTE_MAGNITUDES[i] > bytes) {
i++;
}
return String.format("%.1f%sB", bytes / BYTE_MAGNITUDES[i], BYTE_SUFFIXES[i]);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class HumanReadableByteCountTest {
@Test
public void test() {
String[] suffixes = new String[] {"K", "M", "G", "T", "P", "E"};
assertEquals("999.0B", ByteUtil.getHumanReadableByteCount(999));
assertEquals("500.0B", ByteUtil.getHumanReadableByteCount(500));
for (int i = 0, j = 3; i < 6; i++, j += 3) {
final long magnitude = (long) Math.pow(10, j);
assertEquals("1.0" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude));
assertEquals("1.3" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.25 * magnitude)));
assertEquals("1.5" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.5 * magnitude)));
assertEquals("1.9" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (.9 * magnitude)));
assertEquals("4.2" + suffixes[i] + "B", ByteUtil.getHumanReadableByteCount(magnitude + (long) (3.2 * magnitude)));
}
}
}

View File

@ -107,6 +107,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
boolean isDiskFull();
long getDiskUsableSpace();
long getDiskTotalSpace();
default long getGlobalSize() {
return 0;
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.file.FileStore;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -39,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
@ -77,6 +77,10 @@ public final class PagingManagerImpl implements PagingManager {
private volatile boolean diskFull = false;
private volatile long diskUsableSpace = 0;
private volatile long diskTotalSpace = 0;
private final Executor memoryExecutor;
private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
@ -194,24 +198,26 @@ public final class PagingManagerImpl implements PagingManager {
private final Logger logger = Logger.getLogger(LocalMonitor.class);
@Override
public void tick(FileStore store, double usage) {
logger.tracef("Tick from store:: %s, usage at %f", store, usage);
public void tick(long usableSpace, long totalSpace) {
diskUsableSpace = usableSpace;
diskTotalSpace = totalSpace;
logger.tracef("Tick:: usable space at %f, total space at %f", ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace));
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
if (!diskFull) {
ActiveMQServerLogger.LOGGER.diskBeyondCapacity();
ActiveMQServerLogger.LOGGER.diskBeyondCapacity(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
diskFull = true;
}
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
final boolean diskFull = PagingManagerImpl.this.diskFull;
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
if (diskFull) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored();
ActiveMQServerLogger.LOGGER.diskCapacityRestored(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
PagingManagerImpl.this.diskFull = false;
}
checkMemoryRelease();
@ -224,6 +230,16 @@ public final class PagingManagerImpl implements PagingManager {
return diskFull;
}
@Override
public long getDiskUsableSpace() {
return diskUsableSpace;
}
@Override
public long getDiskTotalSpace() {
return diskTotalSpace;
}
@Override
public boolean isUsingGlobalSize() {
return maxSize > 0;

View File

@ -385,8 +385,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 229118, value = "Management method not applicable for current server configuration")
IllegalStateException methodNotApplicable();
@Message(id = 229119, value = "Disk Capacity is Low, cannot produce more messages.")
ActiveMQIOErrorException diskBeyondLimit();
@Message(id = 229119, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is beyond the configured <max-disk-usage>. System will start blocking producers.", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIOErrorException diskBeyondLimit(String usableSpace, String totalSpace, String usage);
@Message(id = 229120, value = "connection with ID {0} closed by management", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInternalErrorException connectionWithIDClosedByManagement(String ID);

View File

@ -1341,14 +1341,14 @@ public interface ActiveMQServerLogger extends BasicLogger {
void impossibleToRouteGrouped();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222210, value = "Storage usage is beyond max-disk-usage. System will start blocking producers.",
@Message(id = 222210, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is beyond the configured <max-disk-usage>. System will start blocking producers.",
format = Message.Format.MESSAGE_FORMAT)
void diskBeyondCapacity();
void diskBeyondCapacity(String usableSpace, String totalSpace, String usage);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.",
@Message(id = 222211, value = "Free storage space is at {0} of {1} total. Usage rate is {2} which is below the configured <max-disk-usage>.",
format = Message.Format.MESSAGE_FORMAT)
void diskCapacityRestored();
void diskCapacityRestored(String usableSpace, String totalSpace, String usage);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222212, value = "Disk Full! Blocking message production on address ''{0}''. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)

View File

@ -96,15 +96,14 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public void tick() {
synchronized (monitorLock) {
boolean over = false;
FileStore lastStore = null;
double usage = 0;
long usableSpace = 0;
long totalSpace = 0;
for (FileStore store : stores) {
try {
lastStore = store;
usage = calculateUsage(store);
over = usage > maxUsage;
usableSpace = store.getUsableSpace();
totalSpace = getTotalSpace(store);
over = calculateUsage(usableSpace, totalSpace) > maxUsage;
if (over) {
break;
}
@ -116,12 +115,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
}
for (Callback callback : callbackList) {
callback.tick(lastStore, usage);
callback.tick(usableSpace, totalSpace);
if (over) {
callback.over(lastStore, usage);
callback.over(usableSpace, totalSpace);
} else {
callback.under(lastStore, usage);
callback.under(usableSpace, totalSpace);
}
}
}
@ -136,12 +135,12 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
return this;
}
protected double calculateUsage(FileStore store) throws IOException {
return 1.0 - (double) store.getUsableSpace() / getTotalSpace(store);
public static double calculateUsage(long usableSpace, long totalSpace) {
return 1.0 - (double) usableSpace / (double) totalSpace;
}
private double getTotalSpace(FileStore store) throws IOException {
double totalSpace = (double) store.getTotalSpace();
private long getTotalSpace(FileStore store) throws IOException {
long totalSpace = store.getTotalSpace();
if (totalSpace < 0) {
totalSpace = Long.MAX_VALUE;
}
@ -150,11 +149,10 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public interface Callback {
void tick(FileStore store, double usage);
void tick(long usableSpace, long totalSpace);
void over(FileStore store, double usage);
void over(long usableSpace, long totalSpace);
void under(FileStore store, double usage);
void under(long usableSpace, long totalSpace);
}
}

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -92,6 +93,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
@ -1700,7 +1702,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
try {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
long usableSpace = pagingManager.getDiskUsableSpace();
long totalSpace = pagingManager.getDiskTotalSpace();
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(ByteUtil.getHumanReadableByteCount(usableSpace), ByteUtil.getHumanReadableByteCount(totalSpace), String.format("%.1f%%", FileStoreMonitor.calculateUsage(usableSpace, totalSpace) * 100));
this.getRemotingConnection().fail(exception);
throw exception;
}

View File

@ -20,15 +20,12 @@ package org.apache.activemq.artemis.core.server.files;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.FileStore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -77,35 +74,24 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
public void tick(long usableSpace, long totalSpace) {
tick.incrementAndGet();
System.out.println("tick:: " + store + " usage::" + usage);
System.out.println("tick:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
over.incrementAndGet();
System.out.println("over:: " + store + " usage::" + usage);
System.out.println("over:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
under.incrementAndGet();
System.out.println("under:: " + store + " usage::" + usage);
}
};
final AtomicBoolean fakeReturn = new AtomicBoolean(false);
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) {
@Override
protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) {
return 1f;
} else {
return super.calculateUsage(store);
}
System.out.println("under:: usableSpace: " + usableSpace + ", totalSpace:" + totalSpace);
}
};
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null);
storeMonitor.addCallback(callback);
storeMonitor.addStore(getTestDirfile());
@ -115,7 +101,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
Assert.assertEquals(1, tick.get());
Assert.assertEquals(1, under.get());
fakeReturn.set(true);
storeMonitor.setMaxUsage(0);
storeMonitor.tick();
@ -133,18 +119,18 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
storeMonitor.addStore(getTestDirfile());
storeMonitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
System.out.println("TickS::" + usage);
public void tick(long usableSpace, long totalSpace) {
System.out.println("Tick");
latch.countDown();
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
}
});

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@ -28,11 +32,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.nio.file.FileStore;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class GlobalDiskFullTest extends AmqpClientTestSupport {
@Override
@ -48,15 +47,15 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
monitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
public void tick(long usableSpace, long totalSpace) {
}
@Override
public void over(FileStore store, double usage) {
public void over(long usableSpace, long totalSpace) {
latch.countDown();
}
@Override
public void under(FileStore store, double usage) {
public void under(long usableSpace, long totalSpace) {
}
});

View File

@ -120,6 +120,16 @@ public final class FakePagingManager implements PagingManager {
return false;
}
@Override
public long getDiskUsableSpace() {
return 0;
}
@Override
public long getDiskTotalSpace() {
return 0;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()