From 53e1d601601204dc2aa587fcb3046d5c1d6d026d Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 12 Mar 2018 10:33:09 +0800 Subject: [PATCH] ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage Anonymous senders (those created without a target address) are not blocked when max-disk-usage is reached. The cause is that when such a sender is created on the broker, the broker doesn't check the disk/memory usage and gives out the credit immediately. --- .../amqp/broker/AMQPSessionCallback.java | 27 +++--- .../artemis/core/paging/PagingManager.java | 55 +++++++++++- .../artemis/core/paging/PagingStore.java | 7 +- .../core/paging/impl/PagingManagerImpl.java | 44 ++++++++- .../core/paging/impl/PagingStoreImpl.java | 44 +-------- .../core/server/ActiveMQServerLogger.java | 13 +++ .../core/server/files/FileStoreMonitor.java | 9 +- .../server/files/FileStoreMonitorTest.java | 10 --- .../integration/amqp/GlobalDiskFullTest.java | 89 +++++++++++++++++++ .../tests/unit/util/FakePagingManager.java | 7 +- 10 files changed, 224 insertions(+), 81 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6461bb2011..1f5ccbc30f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; +import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -576,31 +577,25 @@ public class AMQPSessionCallback implements SessionCallback { final int threshold, final Receiver receiver) { try { - if (address == null) { + PagingManager pagingManager = manager.getServer().getPagingManager(); + Runnable creditRunnable = () -> { connection.lock(); try { - receiver.flow(credits); + if (receiver.getRemoteCredit() <= threshold) { + receiver.flow(credits); + } } finally { connection.unlock(); } connection.flush(); + }; + + if (address == null) { + pagingManager.checkMemory(creditRunnable); return; } final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(new Runnable() { - @Override - public void run() { - connection.lock(); - try { - if (receiver.getRemoteCredit() <= threshold) { - receiver.flow(credits); - } - } finally { - connection.unlock(); - } - connection.flush(); - } - }); + store.checkMemory(creditRunnable); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 4d472e182c..c8eb2ec5ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -17,6 +17,9 @@ package org.apache.activemq.artemis.core.paging; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -79,7 +82,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository void resumeCleanup(); - void addBlockedStore(PagingStore store); + void addBlockedStore(Blockable store); void injectMonitor(FileStoreMonitor monitor) throws Exception; @@ -111,4 +114,54 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository return 0; } + boolean checkMemory(Runnable runnable); + + // To be used when the memory is oversized either by local settings or global settings on blocking addresses + final class OverSizedRunnable implements Runnable { + + private final AtomicBoolean ran = new AtomicBoolean(false); + + private final Runnable runnable; + + public OverSizedRunnable(final Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void run() { + if (ran.compareAndSet(false, true)) { + runnable.run(); + } + } + } + + interface Blockable { + /** + * It will return true if the destination is leaving blocking. + */ + boolean checkReleasedMemory(); + } + + final class MemoryFreedRunnablesExecutor implements Runnable { + + private final Queue onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); + + public void addRunnable(PagingManager.OverSizedRunnable runnable) { + onMemoryFreedRunnables.add(runnable); + } + + @Override + public void run() { + Runnable runnable; + + while ((runnable = onMemoryFreedRunnables.poll()) != null) { + runnable.run(); + } + } + + public boolean isEmpty() { + return onMemoryFreedRunnables.isEmpty(); + } + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 4dd8bf832a..27e8c0fe58 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; * * @see PagingManager */ -public interface PagingStore extends ActiveMQComponent, RefCountMessageListener { +public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable { SimpleString getAddress(); @@ -131,11 +131,6 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener boolean isRejectingMessages(); - /** - * It will return true if the destination is leaving blocking. - */ - boolean checkReleasedMemory(); - /** * Write lock the PagingStore. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index bca70cf170..878f918760 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,7 +57,7 @@ public final class PagingManagerImpl implements PagingManager { */ private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock(); - private final Set blockedStored = new ConcurrentHashSet<>(); + private final Set blockedStored = new ConcurrentHashSet<>(); private final ConcurrentMap stores = new ConcurrentHashMap<>(); @@ -78,6 +79,9 @@ public final class PagingManagerImpl implements PagingManager { private ActiveMQScheduledComponent scheduledComponent = null; + private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor(); + + private final Executor executor; // Static // -------------------------------------------------------------------------------------------------------------------------- @@ -102,6 +106,7 @@ public final class PagingManagerImpl implements PagingManager { this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; + executor = pagingStoreFactory.newExecutor(); } public PagingManagerImpl(final PagingStoreFactory pagingSPI, @@ -110,7 +115,7 @@ public final class PagingManagerImpl implements PagingManager { } @Override - public void addBlockedStore(PagingStore store) { + public void addBlockedStore(Blockable store) { blockedStored.add(store); } @@ -152,11 +157,42 @@ public final class PagingManagerImpl implements PagingManager { return globalSizeBytes.get(); } + @Override + public boolean checkMemory(final Runnable runWhenAvailable) { + if (isGlobalFull()) { + OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable); + + memoryFreedRunnablesExecutor.addRunnable(ourRunnable); + addBlockedStore(() -> { + if (!isGlobalFull()) { + if (!memoryFreedRunnablesExecutor.isEmpty()) { + executor.execute(memoryFreedRunnablesExecutor); + ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize()); + return true; + } + } + return false; + }); + + if (isDiskFull()) { + ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull(); + } else { + ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize()); + } + + return true; + } + + runWhenAvailable.run(); + + return true; + } + protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) { - Iterator storeIterator = blockedStored.iterator(); + Iterator storeIterator = blockedStored.iterator(); while (storeIterator.hasNext()) { - PagingStore store = storeIterator.next(); + Blockable store = storeIterator.next(); if (store.checkReleasedMemory()) { storeIterator.remove(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 39687b0e6c..74212ce9de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -23,9 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -641,40 +639,7 @@ public class PagingStoreImpl implements PagingStore { } - private final Queue onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); - - private class MemoryFreedRunnablesExecutor implements Runnable { - - @Override - public void run() { - Runnable runnable; - - while ((runnable = onMemoryFreedRunnables.poll()) != null) { - runnable.run(); - } - } - } - - private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor(); - - // To be used when the memory is oversized either by local settings or global settings on blocking addresses - private static final class OverSizedRunnable implements Runnable { - - private final AtomicBoolean ran = new AtomicBoolean(false); - - private final Runnable runnable; - - private OverSizedRunnable(final Runnable runnable) { - this.runnable = runnable; - } - - @Override - public void run() { - if (ran.compareAndSet(false, true)) { - runnable.run(); - } - } - } + private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor(); @Override public boolean checkMemory(final Runnable runWhenAvailable) { @@ -685,9 +650,9 @@ public class PagingStoreImpl implements PagingStore { } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) { - OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable); + PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable); - onMemoryFreedRunnables.add(ourRunnable); + memoryFreedRunnablesExecutor.addRunnable(ourRunnable); // We check again to avoid a race condition where the size can come down just after the element // has been added, but the check to execute was done before the element was added @@ -710,7 +675,6 @@ public class PagingStoreImpl implements PagingStore { blocking.set(true); } } - return true; } } @@ -755,7 +719,7 @@ public class PagingStoreImpl implements PagingStore { public boolean checkReleaseMemory(boolean globalOversized, long newSize) { if (!globalOversized && (newSize <= maxSize || maxSize < 0)) { - if (!onMemoryFreedRunnables.isEmpty()) { + if (!memoryFreedRunnablesExecutor.isEmpty()) { executor.execute(memoryFreedRunnablesExecutor); if (blocking.get()) { ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index b10d652cd3..96fffe594b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1950,4 +1950,17 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT) void consumerCountError(String reason); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT) + void blockingGlobalDiskFull(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT) + void blockingGlobalMessageProduction(long globalSize); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT) + void unblockingGlobalMessageProduction(long globalSize); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index ad591172d4..957661c4b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -150,11 +150,14 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { public interface Callback { - void tick(FileStore store, double usage); + default void tick(FileStore store, double usage) { + } - void over(FileStore store, double usage); + default void over(FileStore store, double usage) { + } - void under(FileStore store, double usage); + default void under(FileStore store, double usage) { + } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java index b91d3de260..e4f27c32b7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java @@ -137,16 +137,6 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { System.out.println("TickS::" + usage); latch.countDown(); } - - @Override - public void over(FileStore store, double usage) { - - } - - @Override - public void under(FileStore store, double usage) { - - } }); storeMonitor.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java new file mode 100644 index 0000000000..d664013d4d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +import java.net.URI; +import java.nio.file.FileStore; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GlobalDiskFullTest extends AmqpClientTestSupport { + + @Override + protected void addConfiguration(ActiveMQServer server) { + Configuration serverConfig = server.getConfiguration(); + serverConfig.setDiskScanPeriod(100); + } + + @Test + public void testProducerOnDiskFull() throws Exception { + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); + final CountDownLatch latch = new CountDownLatch(1); + monitor.addCallback(new FileStoreMonitor.Callback() { + @Override + public void over(FileStore store, double usage) { + latch.countDown(); + } + @Override + public void under(FileStore store, double usage) { + } + }); + latch.await(2, TimeUnit.SECONDS); + + AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[1000]; + message.setBytes(payload); + + sender.setSendTimeout(1000); + sender.send(message); + + org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName()); + assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount()); + + AmqpSender anonSender = session.createSender(); + final AmqpMessage message1 = new AmqpMessage(); + message1.setBytes(payload); + message1.setAddress(getQueueName()); + + anonSender.setSendTimeout(1000); + anonSender.send(message1); + + queueView = getProxyToQueue(getQueueName()); + assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount()); + + } finally { + connection.close(); + } + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index d1012a60df..34316557e5 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; public final class FakePagingManager implements PagingManager { @Override - public void addBlockedStore(PagingStore store) { + public void addBlockedStore(Blockable store) { } @@ -115,6 +115,11 @@ public final class FakePagingManager implements PagingManager { return false; } + @Override + public boolean checkMemory(Runnable runnable) { + return false; + } + /* * (non-Javadoc) * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()