ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage

Anonymous senders (those created without a target address) are not
blocked when max-disk-usage is reached. The cause is that when such
a sender is created on the broker, the broker doesn't check the
disk/memory usage and gives out the credit immediately.
This commit is contained in:
Howard Gao 2018-03-12 10:33:09 +08:00 committed by Clebert Suconic
parent 168a7cfea5
commit 53e1d60160
10 changed files with 224 additions and 81 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -576,31 +577,25 @@ public class AMQPSessionCallback implements SessionCallback {
final int threshold,
final Receiver receiver) {
try {
if (address == null) {
PagingManager pagingManager = manager.getServer().getPagingManager();
Runnable creditRunnable = () -> {
connection.lock();
try {
receiver.flow(credits);
if (receiver.getRemoteCredit() <= threshold) {
receiver.flow(credits);
}
} finally {
connection.unlock();
}
connection.flush();
};
if (address == null) {
pagingManager.checkMemory(creditRunnable);
return;
}
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(new Runnable() {
@Override
public void run() {
connection.lock();
try {
if (receiver.getRemoteCredit() <= threshold) {
receiver.flow(credits);
}
} finally {
connection.unlock();
}
connection.flush();
}
});
store.checkMemory(creditRunnable);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -17,6 +17,9 @@
package org.apache.activemq.artemis.core.paging;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -79,7 +82,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
void resumeCleanup();
void addBlockedStore(PagingStore store);
void addBlockedStore(Blockable store);
void injectMonitor(FileStoreMonitor monitor) throws Exception;
@ -111,4 +114,54 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
return 0;
}
boolean checkMemory(Runnable runnable);
// To be used when the memory is oversized either by local settings or global settings on blocking addresses
final class OverSizedRunnable implements Runnable {
private final AtomicBoolean ran = new AtomicBoolean(false);
private final Runnable runnable;
public OverSizedRunnable(final Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (ran.compareAndSet(false, true)) {
runnable.run();
}
}
}
interface Blockable {
/**
* It will return true if the destination is leaving blocking.
*/
boolean checkReleasedMemory();
}
final class MemoryFreedRunnablesExecutor implements Runnable {
private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
public void addRunnable(PagingManager.OverSizedRunnable runnable) {
onMemoryFreedRunnables.add(runnable);
}
@Override
public void run() {
Runnable runnable;
while ((runnable = onMemoryFreedRunnables.poll()) != null) {
runnable.run();
}
}
public boolean isEmpty() {
return onMemoryFreedRunnables.isEmpty();
}
}
}

View File

@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*
* @see PagingManager
*/
public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable {
SimpleString getAddress();
@ -131,11 +131,6 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
boolean isRejectingMessages();
/**
* It will return true if the destination is leaving blocking.
*/
boolean checkReleasedMemory();
/**
* Write lock the PagingStore.
*

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -56,7 +57,7 @@ public final class PagingManagerImpl implements PagingManager {
*/
private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
private final Set<Blockable> blockedStored = new ConcurrentHashSet<>();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
@ -78,6 +79,9 @@ public final class PagingManagerImpl implements PagingManager {
private ActiveMQScheduledComponent scheduledComponent = null;
private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
private final Executor executor;
// Static
// --------------------------------------------------------------------------------------------------------------------------
@ -102,6 +106,7 @@ public final class PagingManagerImpl implements PagingManager {
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
executor = pagingStoreFactory.newExecutor();
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@ -110,7 +115,7 @@ public final class PagingManagerImpl implements PagingManager {
}
@Override
public void addBlockedStore(PagingStore store) {
public void addBlockedStore(Blockable store) {
blockedStored.add(store);
}
@ -152,11 +157,42 @@ public final class PagingManagerImpl implements PagingManager {
return globalSizeBytes.get();
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
if (isGlobalFull()) {
OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
addBlockedStore(() -> {
if (!isGlobalFull()) {
if (!memoryFreedRunnablesExecutor.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize());
return true;
}
}
return false;
});
if (isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull();
} else {
ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize());
}
return true;
}
runWhenAvailable.run();
return true;
}
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
Iterator<PagingStore> storeIterator = blockedStored.iterator();
Iterator<Blockable> storeIterator = blockedStored.iterator();
while (storeIterator.hasNext()) {
PagingStore store = storeIterator.next();
Blockable store = storeIterator.next();
if (store.checkReleasedMemory()) {
storeIterator.remove();
}

View File

@ -23,9 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -641,40 +639,7 @@ public class PagingStoreImpl implements PagingStore {
}
private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
private class MemoryFreedRunnablesExecutor implements Runnable {
@Override
public void run() {
Runnable runnable;
while ((runnable = onMemoryFreedRunnables.poll()) != null) {
runnable.run();
}
}
}
private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
// To be used when the memory is oversized either by local settings or global settings on blocking addresses
private static final class OverSizedRunnable implements Runnable {
private final AtomicBoolean ran = new AtomicBoolean(false);
private final Runnable runnable;
private OverSizedRunnable(final Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (ran.compareAndSet(false, true)) {
runnable.run();
}
}
}
private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
@ -685,9 +650,9 @@ public class PagingStoreImpl implements PagingStore {
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable);
onMemoryFreedRunnables.add(ourRunnable);
memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
// We check again to avoid a race condition where the size can come down just after the element
// has been added, but the check to execute was done before the element was added
@ -710,7 +675,6 @@ public class PagingStoreImpl implements PagingStore {
blocking.set(true);
}
}
return true;
}
}
@ -755,7 +719,7 @@ public class PagingStoreImpl implements PagingStore {
public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
if (!onMemoryFreedRunnables.isEmpty()) {
if (!memoryFreedRunnablesExecutor.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
if (blocking.get()) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);

View File

@ -1950,4 +1950,17 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
void consumerCountError(String reason);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)
void blockingGlobalDiskFull();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
void blockingGlobalMessageProduction(long globalSize);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
void unblockingGlobalMessageProduction(long globalSize);
}

View File

@ -150,11 +150,14 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public interface Callback {
void tick(FileStore store, double usage);
default void tick(FileStore store, double usage) {
}
void over(FileStore store, double usage);
default void over(FileStore store, double usage) {
}
void under(FileStore store, double usage);
default void under(FileStore store, double usage) {
}
}
}

View File

@ -137,16 +137,6 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
System.out.println("TickS::" + usage);
latch.countDown();
}
@Override
public void over(FileStore store, double usage) {
}
@Override
public void under(FileStore store, double usage) {
}
});
storeMonitor.start();

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import java.net.URI;
import java.nio.file.FileStore;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class GlobalDiskFullTest extends AmqpClientTestSupport {
@Override
protected void addConfiguration(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setDiskScanPeriod(100);
}
@Test
public void testProducerOnDiskFull() throws Exception {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void over(FileStore store, double usage) {
latch.countDown();
}
@Override
public void under(FileStore store, double usage) {
}
});
latch.await(2, TimeUnit.SECONDS);
AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[1000];
message.setBytes(payload);
sender.setSendTimeout(1000);
sender.send(message);
org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName());
assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
AmqpSender anonSender = session.createSender();
final AmqpMessage message1 = new AmqpMessage();
message1.setBytes(payload);
message1.setAddress(getQueueName());
anonSender.setSendTimeout(1000);
anonSender.send(message1);
queueView = getProxyToQueue(getQueueName());
assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
} finally {
connection.close();
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
@Override
public void addBlockedStore(PagingStore store) {
public void addBlockedStore(Blockable store) {
}
@ -115,6 +115,11 @@ public final class FakePagingManager implements PagingManager {
return false;
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()