ARTEMIS-1732 I simplified some of the changes performed at the previous commit.

Also I changed GlobalDiskFullTest to actually block the senders.
I moved the Runnables from PagingManager into the Util as AtomicRunnable.
This commit is contained in:
Clebert Suconic 2018-07-31 21:08:46 -04:00
parent 53e1d60160
commit 0e36e072bd
12 changed files with 232 additions and 146 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.runnables;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class AtomicRunnable implements Runnable {
public static Runnable checkAtomic(Runnable run) {
if (run instanceof AtomicRunnable) {
return run;
} else {
return new AtomicRunnableWithDelegate(run);
}
}
private volatile int ran;
private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
@Override
public void run() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
atomicRun();
}
}
public abstract void atomicRun();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.runnables;
public class AtomicRunnableWithDelegate extends AtomicRunnable {
private final Runnable runnable;
public AtomicRunnableWithDelegate(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void atomicRun() {
runnable.run();
}
}

View File

@ -581,9 +581,7 @@ public class AMQPSessionCallback implements SessionCallback {
Runnable creditRunnable = () -> {
connection.lock();
try {
if (receiver.getRemoteCredit() <= threshold) {
receiver.flow(credits);
}
receiver.flow(credits);
} finally {
connection.unlock();
}
@ -592,10 +590,10 @@ public class AMQPSessionCallback implements SessionCallback {
if (address == null) {
pagingManager.checkMemory(creditRunnable);
return;
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(creditRunnable);
}
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(creditRunnable);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -17,9 +17,6 @@
package org.apache.activemq.artemis.core.paging;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -82,7 +79,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
void resumeCleanup();
void addBlockedStore(Blockable store);
void addBlockedStore(PagingStore store);
void injectMonitor(FileStoreMonitor monitor) throws Exception;
@ -114,54 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
return 0;
}
boolean checkMemory(Runnable runnable);
// To be used when the memory is oversized either by local settings or global settings on blocking addresses
final class OverSizedRunnable implements Runnable {
private final AtomicBoolean ran = new AtomicBoolean(false);
private final Runnable runnable;
public OverSizedRunnable(final Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (ran.compareAndSet(false, true)) {
runnable.run();
}
}
}
interface Blockable {
/**
* It will return true if the destination is leaving blocking.
*/
boolean checkReleasedMemory();
}
final class MemoryFreedRunnablesExecutor implements Runnable {
private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
public void addRunnable(PagingManager.OverSizedRunnable runnable) {
onMemoryFreedRunnables.add(runnable);
}
@Override
public void run() {
Runnable runnable;
while ((runnable = onMemoryFreedRunnables.poll()) != null) {
runnable.run();
}
}
public boolean isEmpty() {
return onMemoryFreedRunnables.isEmpty();
}
}
/**
* Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
* @param runWhenAvailable
*/
void checkMemory(Runnable runWhenAvailable);
}

View File

@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*
* @see PagingManager
*/
public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable {
public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
SimpleString getAddress();
@ -131,6 +131,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener,
boolean isRejectingMessages();
/**
* It will return true if the destination is leaving blocking.
*/
boolean checkReleasedMemory();
/**
* Write lock the PagingStore.
*

View File

@ -20,8 +20,10 @@ import java.nio.file.FileStore;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
public final class PagingManagerImpl implements PagingManager {
@ -57,7 +60,7 @@ public final class PagingManagerImpl implements PagingManager {
*/
private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final Set<Blockable> blockedStored = new ConcurrentHashSet<>();
private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
@ -75,13 +78,14 @@ public final class PagingManagerImpl implements PagingManager {
private volatile boolean diskFull = false;
private final Executor memoryExecutor;
private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
private ActiveMQScheduledComponent scheduledComponent = null;
private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
private final Executor executor;
// Static
// --------------------------------------------------------------------------------------------------------------------------
@ -106,7 +110,7 @@ public final class PagingManagerImpl implements PagingManager {
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
executor = pagingStoreFactory.newExecutor();
this.memoryExecutor = pagingSPI.newExecutor();
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@ -115,7 +119,7 @@ public final class PagingManagerImpl implements PagingManager {
}
@Override
public void addBlockedStore(Blockable store) {
public void addBlockedStore(PagingStore store) {
blockedStored.add(store);
}
@ -157,42 +161,18 @@ public final class PagingManagerImpl implements PagingManager {
return globalSizeBytes.get();
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
if (isGlobalFull()) {
OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
addBlockedStore(() -> {
if (!isGlobalFull()) {
if (!memoryFreedRunnablesExecutor.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize());
return true;
}
}
return false;
});
if (isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull();
} else {
ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize());
}
return true;
}
runWhenAvailable.run();
return true;
}
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
Iterator<Blockable> storeIterator = blockedStored.iterator();
if (!memoryCallback.isEmpty()) {
if (memoryExecutor != null) {
memoryExecutor.execute(this::memoryReleased);
} else {
memoryReleased();
}
}
Iterator<PagingStore> storeIterator = blockedStored.iterator();
while (storeIterator.hasNext()) {
Blockable store = storeIterator.next();
PagingStore store = storeIterator.next();
if (store.checkReleasedMemory()) {
storeIterator.remove();
}
@ -223,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public void under(FileStore store, double usage) {
if (diskFull) {
if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored();
diskFull = false;
checkMemoryRelease();
@ -241,6 +221,27 @@ public final class PagingManagerImpl implements PagingManager {
return maxSize > 0;
}
@Override
public void checkMemory(final Runnable runWhenAvailable) {
if (isGlobalFull()) {
memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
return;
}
runWhenAvailable.run();
}
private void memoryReleased() {
Runnable runnable;
while ((runnable = memoryCallback.poll()) != null) {
runnable.run();
}
}
@Override
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;

View File

@ -23,7 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -60,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.jboss.logging.Logger;
/**
@ -639,7 +642,16 @@ public class PagingStoreImpl implements PagingStore {
}
private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor();
private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
private void memoryReleased() {
Runnable runnable;
while ((runnable = onMemoryFreedRunnables.poll()) != null) {
runnable.run();
}
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
@ -650,9 +662,8 @@ public class PagingStoreImpl implements PagingStore {
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable);
memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
// We check again to avoid a race condition where the size can come down just after the element
// has been added, but the check to execute was done before the element was added
@ -660,7 +671,7 @@ public class PagingStoreImpl implements PagingStore {
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) {
// run it now
ourRunnable.run();
runWhenAvailable.run();
} else {
if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
pagingManager.addBlockedStore(this);
@ -719,8 +730,8 @@ public class PagingStoreImpl implements PagingStore {
public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
if (!memoryFreedRunnablesExecutor.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking.get()) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(false);

View File

@ -1950,17 +1950,4 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
void consumerCountError(String reason);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)
void blockingGlobalDiskFull();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
void blockingGlobalMessageProduction(long globalSize);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT)
void unblockingGlobalMessageProduction(long globalSize);
}

View File

@ -150,14 +150,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public interface Callback {
default void tick(FileStore store, double usage) {
}
void tick(FileStore store, double usage);
default void over(FileStore store, double usage) {
}
void over(FileStore store, double usage);
default void under(FileStore store, double usage) {
}
void under(FileStore store, double usage);
}
}

View File

@ -137,6 +137,16 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
System.out.println("TickS::" + usage);
latch.countDown();
}
@Override
public void over(FileStore store, double usage) {
}
@Override
public void under(FileStore store, double usage) {
}
});
storeMonitor.start();

View File

@ -25,6 +25,7 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
@ -45,6 +46,11 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
}
@Override
public void over(FileStore store, double usage) {
latch.countDown();
@ -53,7 +59,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
public void under(FileStore store, double usage) {
}
});
latch.await(2, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
@ -61,27 +68,65 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[1000];
message.setBytes(payload);
sender.setSendTimeout(1000);
sender.send(message);
org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName());
assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
AmqpSender anonSender = session.createSender();
final AmqpMessage message1 = new AmqpMessage();
message1.setBytes(payload);
message1.setAddress(getQueueName());
anonSender.setSendTimeout(1000);
anonSender.send(message1);
CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);
queueView = getProxyToQueue(getQueueName());
assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
Thread threadWithName = new Thread() {
@Override
public void run() {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentWithName.countDown();
}
}
};
threadWithName.start();
Thread threadWithAnon = new Thread() {
@Override
public void run() {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentAnon.countDown();
}
}
};
threadWithAnon.start();
Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS));
Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500, TimeUnit.MILLISECONDS));
monitor.setMaxUsage(100.0);
Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS));
Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30, TimeUnit.SECONDS));
threadWithName.join(TimeUnit.SECONDS.toMillis(30));
threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
Assert.assertFalse(threadWithName.isAlive());
Assert.assertFalse(threadWithAnon.isAlive());
} finally {
connection.close();
}

View File

@ -30,7 +30,12 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
@Override
public void addBlockedStore(Blockable store) {
public void addBlockedStore(PagingStore store) {
}
@Override
public void checkMemory(Runnable runWhenAvailable) {
}
@ -115,11 +120,6 @@ public final class FakePagingManager implements PagingManager {
return false;
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()