This commit is contained in:
Clebert Suconic 2018-11-01 15:43:53 -04:00
commit 036e24460b
4 changed files with 67 additions and 56 deletions

View File

@ -443,12 +443,7 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
if (!store.checkMemory(null)) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
if (!store.checkMemory(false, () -> {
Exception exceptionToSend = null;
try {
@ -501,6 +496,11 @@ public class AMQSession implements SessionCallback {
});
}
}
})) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
}
private void enableAutoReadAndTtl() {

View File

@ -127,6 +127,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
boolean checkMemory(Runnable runnable);
boolean checkMemory(boolean runOnFailure, Runnable runnable);
boolean isFull();
boolean isRejectingMessages();

View File

@ -652,13 +652,17 @@ public class PagingStoreImpl implements PagingStore {
}
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
return checkMemory(true, runWhenAvailable);
}
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
if (runWhenAvailable != null) {
if (runOnFailure && runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
}
return false;

View File

@ -410,6 +410,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public boolean checkMemory(boolean runOnFailure, Runnable runnable) {
return false;
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;