ARTEMIS-3178 Page Limitting (max messages and max bytes)

I am adding three attributes to Address-settings:

* page-limit-bytes: Number of bytes. We will convert this metric into max number of pages internally by dividing max-bytes / page-size. It will allow a max based on an estimate.
* page-limit-messages: Number of messages
* page-full-message-policy: fail or drop

We will now allow paging, until these max values and then fail or drop messages.

Once these values are retracted, the address will remain full until a period where cleanup is kicked in by paging. So these values may have a certain delay on being applied, but they should always be cleared once cleanup happened.
This commit is contained in:
Clebert Suconic 2023-01-25 09:59:06 -05:00 committed by clebertsuconic
parent b5ae95c376
commit 764db34e9b
20 changed files with 1235 additions and 81 deletions

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.settings.impl;
public enum PageFullMessagePolicy {
DROP, FAIL
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@ -195,6 +196,18 @@ public final class Validators {
} }
}; };
public static final Validator PAGE_FULL_MESSAGE_POLICY_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null ||
!val.equals(PageFullMessagePolicy.DROP.toString()) &&
!val.equals(PageFullMessagePolicy.FAIL.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidAddressFullPolicyType(val);
}
}
};
public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() { public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() {
@Override @Override
public void validate(final String name, final Object value) { public void validate(final String name, final Object value) {

View File

@ -98,6 +98,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@ -218,6 +219,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy"; private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy";
private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME = "page-full-policy";
private static final String MAX_READ_PAGE_BYTES_NODE_NAME = "max-read-page-bytes"; private static final String MAX_READ_PAGE_BYTES_NODE_NAME = "max-read-page-bytes";
private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages"; private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages";
@ -226,6 +229,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size"; private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";
private static final String PAGE_LIMIT_BYTES_NODE_NAME = "page-limit-bytes";
private static final String PAGE_LIMIT_MESSAGES_NODE_NAME = "page-limit-messages";
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit"; private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
private static final String LVQ_NODE_NAME = "last-value-queue"; private static final String LVQ_NODE_NAME = "last-value-queue";
@ -1281,6 +1288,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
ActiveMQServerLogger.LOGGER.pageMaxSizeUsed(); ActiveMQServerLogger.LOGGER.pageMaxSizeUsed();
} }
addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child)); addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child));
} else if (PAGE_LIMIT_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
long pageLimitBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_BYTES_NODE_NAME, pageLimitBytes);
addressSettings.setPageLimitBytes(pageLimitBytes);
} else if (PAGE_LIMIT_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
long pageLimitMessages = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_MESSAGES_NODE_NAME, pageLimitMessages);
addressSettings.setPageLimitMessages(pageLimitMessages);
} else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) { } else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child)); addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
} else if (ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) { } else if (ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
@ -1288,6 +1303,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME, value); Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME, value);
AddressFullMessagePolicy policy = Enum.valueOf(AddressFullMessagePolicy.class, value); AddressFullMessagePolicy policy = Enum.valueOf(AddressFullMessagePolicy.class, value);
addressSettings.setAddressFullMessagePolicy(policy); addressSettings.setAddressFullMessagePolicy(policy);
} else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME, value);
PageFullMessagePolicy policy = Enum.valueOf(PageFullMessagePolicy.class, value);
addressSettings.setPageFullMessagePolicy(policy);
} else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) { } else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child)); addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
} else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) { } else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.replication.ReplicationManager;
@ -30,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
AddressFullMessagePolicy getAddressFullMessagePolicy(); AddressFullMessagePolicy getAddressFullMessagePolicy();
PageFullMessagePolicy getPageFullMessagePolicy();
Long getPageLimitMessages();
Long getPageLimitBytes();
/** Callback to be used by a counter when the Page is full for that counter */
void pageFull(PageSubscription subscription);
boolean isPageFull();
void checkPageLimit(long numberOfMessages);
long getFirstPage(); long getFirstPage();
int getPageSizeBytes(); int getPageSizeBytes();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import java.util.concurrent.Future;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -46,7 +47,7 @@ public interface PageCursorProvider {
void flushExecutors(); void flushExecutors();
void scheduleCleanup(); Future<Boolean> scheduleCleanup();
void disableCleanup(); void disableCleanup();

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -37,12 +38,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LinkedList; import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
public class PageCursorProviderImpl implements PageCursorProvider { public class PageCursorProviderImpl implements PageCursorProvider {
@ -179,11 +182,14 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public void scheduleCleanup() { public Future<Boolean> scheduleCleanup() {
final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) { if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
// Scheduled cleanup was already scheduled before.. never mind! // Scheduled cleanup was already scheduled before.
// or we have cleanup disabled // On that case just flush the executor returning the future.set(true)
return; // after any previous scheduled cleanup is finished.
pagingStore.execute(() -> future.set(true));
return future;
} }
scheduledCleanup.incrementAndGet(); scheduledCleanup.incrementAndGet();
@ -199,9 +205,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} finally { } finally {
storageManager.clearContext(); storageManager.clearContext();
scheduledCleanup.decrementAndGet(); scheduledCleanup.decrementAndGet();
future.set(true);
} }
} }
}); });
return future;
} }
/** /**
@ -241,6 +250,22 @@ public class PageCursorProviderImpl implements PageCursorProvider {
scheduleCleanup(); scheduleCleanup();
} }
private long getNumberOfMessagesOnSubscriptions() {
AtomicLong largerCounter = new AtomicLong();
activeCursors.forEach((id, sub) -> {
long value = sub.getCounter().getValue();
if (value > largerCounter.get()) {
largerCounter.set(value);
}
});
return largerCounter.get();
}
void checkClearPageLimit() {
pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
}
protected void cleanup() { protected void cleanup() {
if (!countersRebuilt) { if (!countersRebuilt) {
@ -299,6 +324,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// Then we do some check on eventual pages that can be already removed but they are away from the streaming // Then we do some check on eventual pages that can be already removed but they are away from the streaming
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage); cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);
if (pagingStore.isPageFull()) {
checkClearPageLimit();
}
assert pagingStore.getNumberOfPages() >= 0; assert pagingStore.getNumberOfPages() >= 0;
if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) { if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {

View File

@ -20,6 +20,7 @@ import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -56,6 +57,8 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
private PageSubscription subscription; private PageSubscription subscription;
private PagingStore pagingStore;
private final StorageManager storage; private final StorageManager storage;
private volatile long value; private volatile long value;
@ -187,11 +190,16 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size); logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
} }
valueUpdater.addAndGet(this, add); long value = valueUpdater.addAndGet(this, add);
persistentSizeUpdater.addAndGet(this, size); persistentSizeUpdater.addAndGet(this, size);
if (add > 0) { if (add > 0) {
addedUpdater.addAndGet(this, add); addedUpdater.addAndGet(this, add);
addedPersistentSizeUpdater.addAndGet(this, size); addedPersistentSizeUpdater.addAndGet(this, size);
/// we could have pagingStore null on tests, so we need to validate if pagingStore != null before anything...
if (pagingStore != null && pagingStore.getPageFullMessagePolicy() != null && !pagingStore.isPageFull()) {
checkAdd(value);
}
} }
if (isRebuilding()) { if (isRebuilding()) {
@ -200,6 +208,15 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
} }
} }
private void checkAdd(long numberOfMessages) {
Long pageLimitMessages = pagingStore.getPageLimitMessages();
if (pageLimitMessages != null) {
if (numberOfMessages >= pageLimitMessages.longValue()) {
pagingStore.pageFull(this.subscription);
}
}
}
@Override @Override
public void delete() throws Exception { public void delete() throws Exception {
Transaction tx = new TransactionImpl(storage); Transaction tx = new TransactionImpl(storage);
@ -420,6 +437,7 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
@Override @Override
public PageSubscriptionCounter setSubscription(PageSubscription subscription) { public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
this.subscription = subscription; this.subscription = subscription;
this.pagingStore = subscription.getPagingStore();
return this; return this;
} }
} }

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@ -52,6 +53,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@ -99,6 +101,16 @@ public class PagingStoreImpl implements PagingStore {
private long maxMessages; private long maxMessages;
private volatile boolean pageFull;
private Long pageLimitBytes;
private Long estimatedMaxPages;
private Long pageLimitMessages;
private PageFullMessagePolicy pageFullMessagePolicy;
private int pageSize; private int pageSize;
private volatile AddressFullMessagePolicy addressFullMessagePolicy; private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@ -225,6 +237,40 @@ public class PagingStoreImpl implements PagingStore {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy(); addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold(); rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
pageLimitBytes = addressSettings.getPageLimitBytes();
if (pageLimitBytes != null && pageLimitBytes.longValue() < 0) {
logger.debug("address {} had pageLimitBytes<0, setting it as null", address);
pageLimitBytes = null;
}
pageLimitMessages = addressSettings.getPageLimitMessages();
if (pageLimitMessages != null && pageLimitMessages.longValue() < 0) {
logger.debug("address {} had pageLimitMessages<0, setting it as null", address);
pageLimitMessages = null;
}
if (pageLimitBytes == null && pageLimitMessages == null && pageFullMessagePolicy != null) {
ActiveMQServerLogger.LOGGER.noPageLimitsSet(address, pageFullMessagePolicy);
this.pageFullMessagePolicy = null;
}
if (pageLimitBytes != null && pageLimitMessages != null && pageFullMessagePolicy == null) {
ActiveMQServerLogger.LOGGER.noPagefullPolicySet(address, pageLimitBytes, pageLimitMessages);
this.pageFullMessagePolicy = null;
this.pageLimitMessages = null;
this.pageLimitBytes = null;
}
if (pageLimitBytes != null && pageSize > 0) {
estimatedMaxPages = pageLimitBytes / pageSize;
logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
}
} }
@Override @Override
@ -232,6 +278,79 @@ public class PagingStoreImpl implements PagingStore {
return "PagingStoreImpl(" + this.address + ")"; return "PagingStoreImpl(" + this.address + ")";
} }
@Override
public PageFullMessagePolicy getPageFullMessagePolicy() {
return pageFullMessagePolicy;
}
@Override
public Long getPageLimitMessages() {
return pageLimitMessages;
}
@Override
public Long getPageLimitBytes() {
return pageLimitBytes;
}
@Override
public void pageFull(PageSubscription subscription) {
this.pageFull = true;
try {
ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue());
} catch (Throwable e) {
// I don't think subscription would ever have a null queue. I'm being cautious here for tests
logger.warn(e.getMessage(), e);
}
}
@Override
public boolean isPageFull() {
return pageFull;
}
private boolean isBelowPageLimitBytes() {
if (estimatedMaxPages != null) {
return (numberOfPages <= estimatedMaxPages.longValue());
} else {
return true;
}
}
private void checkNumberOfPages() {
if (!isBelowPageLimitBytes()) {
this.pageFull = true;
ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
}
}
@Override
public void checkPageLimit(long numberOfMessages) {
boolean pageMessageMessagesClear = true;
Long pageLimitMessages = getPageLimitMessages();
if (pageLimitMessages != null) {
if (logger.isDebugEnabled()) { // gate to avoid boxing of numberOfMessages
logger.debug("Address {} has {} messages on the larger queue", storeName, numberOfMessages);
}
pageMessageMessagesClear = (numberOfMessages < pageLimitMessages.longValue());
}
boolean pageMessageBytesClear = isBelowPageLimitBytes();
if (pageMessageBytesClear && pageMessageMessagesClear) {
pageLimitReleased();
}
}
private void pageLimitReleased() {
if (pageFull) {
ActiveMQServerLogger.LOGGER.pageFree(getAddress());
this.pageFull = false;
}
}
@Override @Override
public boolean lock(long timeout) { public boolean lock(long timeout) {
if (timeout == -1) { if (timeout == -1) {
@ -480,6 +599,8 @@ public class PagingStoreImpl implements PagingStore {
numberOfPages = files.size(); numberOfPages = files.size();
checkNumberOfPages();
for (String fileName : files) { for (String fileName : files) {
final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName); final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
@ -556,6 +677,7 @@ public class PagingStoreImpl implements PagingStore {
if (isPaging) { if (isPaging) {
paging = false; paging = false;
ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, getPageInfo()); ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, getPageInfo());
pageLimitReleased();
} }
this.cursorProvider.onPageModeCleared(); this.cursorProvider.onPageModeCleared();
} finally { } finally {
@ -1029,6 +1151,25 @@ public class PagingStoreImpl implements PagingStore {
lock.readLock().unlock(); lock.readLock().unlock();
} }
if (pageFull) {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
}
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo());
}
// we are in page mode, if we got to this point, we are dropping the message while still paging
// this needs to return true as it is paging
return true;
}
return writePage(message, tx, listCtx); return writePage(message, tx, listCtx);
} }
@ -1282,6 +1423,8 @@ public class PagingStoreImpl implements PagingStore {
try { try {
numberOfPages++; numberOfPages++;
checkNumberOfPages();
final long newPageId = currentPageId + 1; final long newPageId = currentPageId + 1;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {

View File

@ -527,4 +527,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229245, value = "Management controller is busy with another task. Please try again") @Message(id = 229245, value = "Management controller is busy with another task. Please try again")
ActiveMQTimeoutException managementBusy(); ActiveMQTimeoutException managementBusy();
@Message(id = 229246, value = "Invalid page full message policy type {}")
IllegalArgumentException invalidPageFullPolicyType(String val);
} }

View File

@ -1563,4 +1563,23 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224119, value = "Unable to refresh security settings: {}", level = LogMessage.Level.WARN) @LogMessage(id = 224119, value = "Unable to refresh security settings: {}", level = LogMessage.Level.WARN)
void unableToRefreshSecuritySettings(String exceptionMessage); void unableToRefreshSecuritySettings(String exceptionMessage);
@LogMessage(id = 224120, value = "Queue {} on Address {} has more messages than configured page limit. PageLimitMesages={} while currentValue={}", level = LogMessage.Level.WARN)
void pageFull(SimpleString queue, SimpleString address, Object pageLImitMessage, Object currentValue);
@LogMessage(id = 224121, value = "Queue {} on Address {} is out of page limit now. We will issue a cleanup to check other queues.", level = LogMessage.Level.WARN)
void pageFree(SimpleString queue, SimpleString address);
@LogMessage(id = 224122, value = "Address {} number of messages is under page limit again, and it should be allowed to page again.", level = LogMessage.Level.INFO)
void pageFree(SimpleString address);
@LogMessage(id = 224123, value = "Address {} has more pages than allowed. System currently has {} pages, while the estimated max number of pages is {}, based on the limitPageBytes ({}) / page-size ({})", level = LogMessage.Level.WARN)
void pageFullMaxBytes(SimpleString address, long pages, long maxPages, long limitBytes, long bytes);
@LogMessage(id = 224124, value = "Address {} has a pageFullPolicy set as {} but there are not page-limit-bytes or page-limit-messages set. Page full configuration being ignored on this address.", level = LogMessage.Level.WARN)
void noPageLimitsSet(Object address, Object policy);
@LogMessage(id = 224125, value = "Address {} has page-limit-bytes={}, page-limit-messages={} and no page-full-policy set. Page full configuration being ignored on this address", level = LogMessage.Level.WARN)
void noPagefullPolicySet(Object address, Object limitBytes, Object limitMessages);
} }

View File

@ -147,6 +147,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer maxReadPageMessages = null; private Integer maxReadPageMessages = null;
private Long pageLimitBytes = null;
private Long pageLimitMessages = null;
private PageFullMessagePolicy pageFullMessagePolicy = null;
private Long maxSizeMessages = null; private Long maxSizeMessages = null;
private Integer pageSizeBytes = null; private Integer pageSizeBytes = null;
@ -289,6 +295,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.maxSizeMessages = other.maxSizeMessages; this.maxSizeMessages = other.maxSizeMessages;
this.maxReadPageMessages = other.maxReadPageMessages; this.maxReadPageMessages = other.maxReadPageMessages;
this.maxReadPageBytes = other.maxReadPageBytes; this.maxReadPageBytes = other.maxReadPageBytes;
this.pageLimitBytes = other.pageLimitBytes;
this.pageLimitMessages = other.pageLimitMessages;
this.pageFullMessagePolicy = other.pageFullMessagePolicy;
this.pageSizeBytes = other.pageSizeBytes; this.pageSizeBytes = other.pageSizeBytes;
this.pageMaxCache = other.pageMaxCache; this.pageMaxCache = other.pageMaxCache;
this.dropMessagesWhenFull = other.dropMessagesWhenFull; this.dropMessagesWhenFull = other.dropMessagesWhenFull;
@ -644,6 +653,33 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this; return this;
} }
public Long getPageLimitBytes() {
return pageLimitBytes;
}
public AddressSettings setPageLimitBytes(Long pageLimitBytes) {
this.pageLimitBytes = pageLimitBytes;
return this;
}
public Long getPageLimitMessages() {
return pageLimitMessages;
}
public AddressSettings setPageLimitMessages(Long pageLimitMessages) {
this.pageLimitMessages = pageLimitMessages;
return this;
}
public PageFullMessagePolicy getPageFullMessagePolicy() {
return this.pageFullMessagePolicy;
}
public AddressSettings setPageFullMessagePolicy(PageFullMessagePolicy policy) {
this.pageFullMessagePolicy = policy;
return this;
}
public int getMaxReadPageBytes() { public int getMaxReadPageBytes() {
return maxReadPageBytes != null ? maxReadPageBytes : 2 * getPageSizeBytes(); return maxReadPageBytes != null ? maxReadPageBytes : 2 * getPageSizeBytes();
} }
@ -1223,6 +1259,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (enableIngressTimestamp == null) { if (enableIngressTimestamp == null) {
enableIngressTimestamp = merged.enableIngressTimestamp; enableIngressTimestamp = merged.enableIngressTimestamp;
} }
if (pageFullMessagePolicy == null) {
pageFullMessagePolicy = merged.pageFullMessagePolicy;
}
if (pageLimitBytes == null) {
pageLimitBytes = merged.pageLimitBytes;
}
if (pageLimitMessages == null) {
pageLimitMessages = merged.pageLimitMessages;
}
} }
@Override @Override
@ -1472,6 +1517,24 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) { if (buffer.readableBytes() > 0) {
maxReadPageMessages = BufferHelper.readNullableInteger(buffer); maxReadPageMessages = BufferHelper.readNullableInteger(buffer);
} }
if (buffer.readableBytes() > 0) {
pageLimitBytes = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
pageLimitMessages = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
policyStr = buffer.readNullableSimpleString();
if (policyStr != null) {
pageFullMessagePolicy = PageFullMessagePolicy.valueOf(policyStr.toString());
} else {
pageFullMessagePolicy = null;
}
}
} }
@Override @Override
@ -1542,7 +1605,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) + BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
BufferHelper.sizeOfNullableLong(maxSizeMessages) + BufferHelper.sizeOfNullableLong(maxSizeMessages) +
BufferHelper.sizeOfNullableInteger(maxReadPageMessages) + BufferHelper.sizeOfNullableInteger(maxReadPageMessages) +
BufferHelper.sizeOfNullableInteger(maxReadPageBytes); BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
BufferHelper.sizeOfNullableLong(pageLimitBytes) +
BufferHelper.sizeOfNullableLong(pageLimitMessages) +
BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null);
} }
@Override @Override
@ -1682,6 +1748,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableInteger(buffer, maxReadPageBytes); BufferHelper.writeNullableInteger(buffer, maxReadPageBytes);
BufferHelper.writeNullableInteger(buffer, maxReadPageMessages); BufferHelper.writeNullableInteger(buffer, maxReadPageMessages);
BufferHelper.writeNullableLong(buffer, pageLimitBytes);
BufferHelper.writeNullableLong(buffer, pageLimitMessages);
buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new SimpleString(pageFullMessagePolicy.toString()) : null);
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -1758,6 +1831,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode()); result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode()); result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode()); result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode());
result = prime * result + ((pageLimitBytes == null) ? 0 : pageLimitBytes.hashCode());
result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode());
result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode());
return result; return result;
} }
@ -2130,6 +2207,31 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!maxSizeMessages.equals(other.maxSizeMessages)) } else if (!maxSizeMessages.equals(other.maxSizeMessages))
return false; return false;
if (pageLimitBytes == null) {
if (other.pageLimitBytes != null) {
return false;
}
} else if (!pageLimitBytes.equals(other.pageLimitBytes)) {
return false;
}
if (pageLimitMessages == null) {
if (other.pageLimitMessages != null) {
return false;
}
} else if (!pageLimitMessages.equals(other.pageLimitMessages)) {
return false;
}
if (pageFullMessagePolicy == null) {
if (other.pageFullMessagePolicy != null) {
return false;
}
} else if (!pageFullMessagePolicy.equals(other.pageFullMessagePolicy)) {
return false;
}
return true; return true;
} }
@ -2267,6 +2369,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
enableMetrics + enableMetrics +
", enableIngressTime=" + ", enableIngressTime=" +
enableIngressTimestamp + enableIngressTimestamp +
", pageLimitBytes=" +
pageLimitBytes +
", pageLimitMessages=" +
pageLimitMessages +
", pageFullMessagePolicy=" +
pageFullMessagePolicy +
"]"; "]";
} }
} }

View File

@ -3757,6 +3757,23 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="page-limit-bytes" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
After the address enters into page mode, this attribute will configure how many pages can be written into page before activating the page-full-policy.
Supports byte notation like "K", "Mb", "MiB", "GB", etc.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="page-limit-messages" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
After the address enters into page mode, this attribute will configure how many messages can be written into page before activating the page-full-policy.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1"
minOccurs="0"> minOccurs="0">
<xsd:annotation> <xsd:annotation>
@ -3819,6 +3836,20 @@
</xsd:simpleType> </xsd:simpleType>
</xsd:element> </xsd:element>
<xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
After entering page mode, a second limit will be set by page-limit-bytes and/or page-limit-messages. The page-full-policy will configure what to do when that limit is reached.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="DROP"/>
<xsd:enumeration value="FAIL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="message-counter-history-day-limit" type="xsd:int" default="0" maxOccurs="1" <xsd:element name="message-counter-history-day-limit" type="xsd:int" default="0" maxOccurs="1"
minOccurs="0"> minOccurs="0">
<xsd:annotation> <xsd:annotation>

View File

@ -30,6 +30,7 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -50,6 +51,12 @@ import org.apache.activemq.artemis.core.config.federation.FederationPolicySet;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
@ -58,10 +65,12 @@ import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServer
import org.apache.activemq.artemis.core.server.routing.KeyType; import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.ClassUtils;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
@ -1004,6 +1013,125 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(SimpleString.toSimpleString("moreImportant"), configuration.getAddressSettings().get("Name.With.Dots").getExpiryAddress()); Assert.assertEquals(SimpleString.toSimpleString("moreImportant"), configuration.getAddressSettings().get("Name.With.Dots").getExpiryAddress());
} }
@Test
public void testAddressSettingsPageLimit() throws Throwable {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
String randomString = RandomUtil.randomString();
properties.put("addressSettings.#.expiryAddress", randomString);
properties.put("addressSettings.#.pageLimitMessages", "300");
properties.put("addressSettings.#.pageLimitBytes", "300000");
properties.put("addressSettings.#.pageFullMessagePolicy", "DROP");
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getAddressSettings().size());
Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress());
Assert.assertEquals(300L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
Assert.assertEquals(300000L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
Assert.assertEquals(300L, storeImpl.getPageLimitMessages().longValue());
Assert.assertEquals(300000L, storeImpl.getPageLimitBytes().longValue());
Assert.assertEquals("DROP", storeImpl.getPageFullMessagePolicy().toString());
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration1() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
String randomString = RandomUtil.randomString();
properties.put("addressSettings.#.expiryAddress", randomString);
properties.put("addressSettings.#.pageLimitMessages", "300");
properties.put("addressSettings.#.pageLimitBytes", "300000");
//properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // removing the pageFull on purpose
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getAddressSettings().size());
Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress());
Assert.assertEquals(300L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
Assert.assertEquals(300000L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageFullMessagePolicy());
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration2() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
String randomString = RandomUtil.randomString();
properties.put("addressSettings.#.expiryAddress", randomString);
//properties.put("addressSettings.#.pageLimitMessages", "300"); // removing this on purpose
//properties.put("addressSettings.#.pageLimitBytes", "300000"); // removing this on purpose
properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // keeping this on purpose
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getAddressSettings().size());
Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress());
Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageLimitMessages());
Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageLimitBytes());
Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224124"));
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration3() throws Throwable {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
String randomString = RandomUtil.randomString();
properties.put("addressSettings.#.expiryAddress", randomString);
properties.put("addressSettings.#.pageLimitMessages", "-1"); // -1 on purpose, to make it null on final parsing
properties.put("addressSettings.#.pageLimitBytes", "-1"); // -1 on purpose, to make it null on final parsing
properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // keeping this on purpose
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getAddressSettings().size());
Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress());
Assert.assertEquals(-1L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue());
Assert.assertEquals(-1L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue());
Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString());
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
}
@Test @Test
public void testDivertViaProperties() throws Exception { public void testDivertViaProperties() throws Exception {
ConfigurationImpl configuration = new ConfigurationImpl(); ConfigurationImpl configuration = new ConfigurationImpl();

View File

@ -393,6 +393,25 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
AddressSettings settings = configuration.getAddressSettings().get("foo"); AddressSettings settings = configuration.getAddressSettings().get("foo");
Assert.assertEquals(1024, settings.getMaxReadPageBytes()); Assert.assertEquals(1024, settings.getMaxReadPageBytes());
Assert.assertEquals(33, settings.getMaxReadPageMessages()); Assert.assertEquals(33, settings.getMaxReadPageMessages());
Assert.assertNull(settings.getPageLimitBytes());
Assert.assertNull(settings.getPageLimitMessages());
Assert.assertNull(settings.getPageFullMessagePolicy());
}
@Test
public void testParsePageLimitSettings() throws Exception {
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-read-page-bytes>1k</max-read-page-bytes><page-limit-bytes>2k</page-limit-bytes><page-limit-messages>337</page-limit-messages><page-full-policy>FAIL</page-full-policy><max-read-page-messages>33</max-read-page-messages>.\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration configuration = parser.parseMainConfig(input);
AddressSettings settings = configuration.getAddressSettings().get("foo");
Assert.assertEquals(1024, settings.getMaxReadPageBytes());
Assert.assertEquals(33, settings.getMaxReadPageMessages());
Assert.assertEquals(2048L, settings.getPageLimitBytes().longValue());
Assert.assertEquals(337L, settings.getPageLimitMessages().longValue());
Assert.assertEquals("FAIL", settings.getPageFullMessagePolicy().toString());
} }
@Test @Test

View File

@ -134,6 +134,7 @@ import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -1505,6 +1506,8 @@ public abstract class ActiveMQTestBase extends Assert {
return createServer(realFiles, configuration, pageSize, maxAddressSize, null, null, settings); return createServer(realFiles, configuration, pageSize, maxAddressSize, null, null, settings);
} }
protected final ActiveMQServer createServer(final boolean realFiles, protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration, final Configuration configuration,
final int pageSize, final int pageSize,
@ -1512,6 +1515,20 @@ public abstract class ActiveMQTestBase extends Assert {
final Integer maxReadPageMessages, final Integer maxReadPageMessages,
final Integer maxReadPageBytes, final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) { final Map<String, AddressSettings> settings) {
return createServer(realFiles, configuration, pageSize, maxAddressSize, maxReadPageMessages, maxReadPageBytes, null, null, null, settings);
}
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Long pageLimitBytes,
final Long pageLimitMessages,
final String pageLimitPolicy,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles)); ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) { if (settings != null) {
@ -1522,6 +1539,15 @@ public abstract class ActiveMQTestBase extends Assert {
if (maxReadPageMessages != null) { if (maxReadPageMessages != null) {
setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue()); setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue());
} }
if (pageLimitBytes != null) {
setting.getValue().setPageLimitBytes(pageLimitBytes);
}
if (pageLimitMessages != null) {
setting.getValue().setPageLimitMessages(pageLimitMessages);
}
if (pageLimitPolicy != null) {
setting.getValue().setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
}
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue()); server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
} }
} }
@ -1533,6 +1559,15 @@ public abstract class ActiveMQTestBase extends Assert {
if (maxReadPageMessages != null) { if (maxReadPageMessages != null) {
defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue()); defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue());
} }
if (pageLimitBytes != null) {
defaultSetting.setPageLimitBytes(pageLimitBytes);
}
if (pageLimitMessages != null) {
defaultSetting.setPageLimitMessages(pageLimitMessages);
}
if (pageLimitPolicy != null) {
defaultSetting.setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy));
}
server.getAddressSettingsRepository().addMatch("#", defaultSetting); server.getAddressSettingsRepository().addMatch("#", defaultSetting);

View File

@ -74,6 +74,9 @@ Configuration is done at the address settings in `broker.xml`.
<max-size-messages>1000</max-size-messages> <max-size-messages>1000</max-size-messages>
<page-size-bytes>10485760</page-size-bytes> <page-size-bytes>10485760</page-size-bytes>
<address-full-policy>PAGE</address-full-policy> <address-full-policy>PAGE</address-full-policy>
<page-limit-bytes>10G</page-limit-bytes>
<page-limit-messages>1000000</page-limit-messages>
<page-full-policy>FAIL</page-full-policy>
</address-setting> </address-setting>
</address-settings> </address-settings>
``` ```
@ -98,6 +101,9 @@ Property Name|Description|Default
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE`
`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. | -1 `max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. | -1
`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. | 2 * page-size-bytes `max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. | 2 * page-size-bytes
`page-limit-bytes` | After entering page mode, how much data would the system allow incoming. Notice this will be internally converted as number of pages. |
`page-limit-messages` | After entering page mode, how many messages would the system allow incoming on paging. |
`page-full-policy` | Valid results are DROP or FAIL. This tells what to do if the system is reaching `page-limit-bytes` or `page-limit-messages` after paging |
### max-size-bytes and max-size-messages simultaneous usage ### max-size-bytes and max-size-messages simultaneous usage
@ -205,6 +211,14 @@ The system should keep at least one paged file in memory caching ahead reading m
Also every active subscription could keep one paged file in memory. Also every active subscription could keep one paged file in memory.
So, if your system has too many queues it is recommended to minimize the page-size. So, if your system has too many queues it is recommended to minimize the page-size.
## Page Limits and Page Full Policy
Since version `2.28.0` is possible to configure limits on how much data is paged. This is to avoid a single destination using the entire disk in case their consumers are gone.
You can configure either `page-limit-bytes` or `page-limit-messages`, along with `page-full-policy` on the address settings limiting how much data will be recorded in paging.
If you configure `page-full-policy` as DROP, messages will be simplify dropped while the clients will not get any exceptions, while if you configured FAIL the producers will receive a JMS Exception for the error condition.
## Example ## Example
See the [Paging Example](examples.md#paging) which shows how to use paging with See the [Paging Example](examples.md#paging) which shows how to use paging with

View File

@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
@ -32,67 +33,130 @@ import javax.naming.InitialContext;
public class PagingExample { public class PagingExample {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
Connection connection = null; // simple routing showing how paging should work
simplePaging();
// simple routing showing what happens when paging enters into page-full
pageFullLimit();
}
public static void pageFullLimit() throws Exception {
InitialContext initialContext = null; InitialContext initialContext = null;
try { try {
// Step 1. Create an initial context to perform the JNDI lookup. // Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext(); initialContext = new InitialContext();
// Step 2. Perform a lookup on the Connection Factory // Perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory"); ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number // Create a JMS Connection
// of bytes in memory try (Connection connection = cf.createConnection()) {
Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue");
// Step 4. Lookup for a JMS Queue // Create a JMS Session
Queue queue = (Queue) initialContext.lookup("queue/exampleQueue"); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// Step 5. Create a JMS Connection // lookup the queue
connection = cf.createConnection(); Queue queue = session.createQueue("pagingQueueLimited");
// Step 6. Create a JMS Session // Create a JMS Message Producer for pageQueueAddress
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer pageMessageProducer = session.createProducer(queue);
// Step 7. Create a JMS Message Producer for pageQueueAddress // We don't need persistent messages in order to use paging. (This step is optional)
MessageProducer pageMessageProducer = session.createProducer(pageQueue); pageMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Step 8. We don't need persistent messages in order to use paging. (This step is optional) // Create a Binary Bytes Message with 10K arbitrary bytes
pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
BytesMessage message = session.createBytesMessage(); BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10 * 1024]); message.writeBytes(new byte[10 * 1024]);
// Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at try {
// Send messages to the queue until the address is full
for (int i = 0; i < 2000; i++) {
pageMessageProducer.send(message);
if (i > 0 && i % 100 == 0) {
// batch commit on the sends
session.commit();
}
}
throw new RuntimeException("Example was supposed to get a page full exception. Check your example configuration or report a bug");
} catch (JMSException e) {
System.out.println("The producer has thrown an expected exception " + e);
}
session.commit();
}
} finally {
// And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS
// connection will automatically close all of its sessions, consumers, producer and browser objects
if (initialContext != null) {
initialContext.close();
}
}
}
public static void simplePaging() throws Exception {
InitialContext initialContext = null;
try {
// Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext();
// Perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
// We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number
// of bytes in memory
Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue");
// Lookup for a JMS Queue
Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
// Create a JMS Connection
try (Connection connection = cf.createConnection()) {
// Create a JMS Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create a JMS Message Producer for pageQueueAddress
MessageProducer pageMessageProducer = session.createProducer(pageQueue);
// We don't need persistent messages in order to use paging. (This step is optional)
pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a Binary Bytes Message with 10K arbitrary bytes
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10 * 1024]);
// Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at
// ./paging/config/activemq-queues.xml for the config. // ./paging/config/activemq-queues.xml for the config.
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
pageMessageProducer.send(message); pageMessageProducer.send(message);
} }
// Step 11. Create a JMS Message Producer // Create a JMS Message Producer
MessageProducer messageProducer = session.createProducer(queue); MessageProducer messageProducer = session.createProducer(queue);
// Step 12. We don't need persistent messages in order to use paging. (This step is optional) // We don't need persistent messages in order to use paging. (This step is optional)
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server // Send the message for about 1K, which should be over the memory limit imposed by the server
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
messageProducer.send(message); messageProducer.send(message);
} }
// Step 14. if you pause this example here, you will see several files under ./build/data/paging // if you pause this example here, you will see several files under ./build/data/paging
// Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created
// files just for // files just for
// Step 15. Create a JMS Message Consumer // Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(queue); MessageConsumer messageConsumer = session.createConsumer(queue);
// Step 16. Start the JMS Connection. This step will activate the subscribers to receive messages. // Start the JMS Connection. This step will activate the subscribers to receive messages.
connection.start(); connection.start();
// Step 17. Receive the messages. It's important to ACK for messages as ActiveMQ Artemis will not read messages from // Receive the messages. It's important to ACK for messages as ActiveMQ Artemis will not read messages from
// paging // paging
// until messages are ACKed // until messages are ACKed
@ -107,7 +171,7 @@ public class PagingExample {
message.acknowledge(); message.acknowledge();
// Step 18. Receive the messages from the Queue names pageQueue. Create the proper consumer for that // Receive the messages from the Queue names pageQueue. Create the proper consumer for that
messageConsumer.close(); messageConsumer.close();
messageConsumer = session.createConsumer(pageQueue); messageConsumer = session.createConsumer(pageQueue);
@ -118,6 +182,8 @@ public class PagingExample {
message.acknowledge(); message.acknowledge();
} }
}
} finally { } finally {
// And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS
// connection will automatically close all of its sessions, consumers, producer and browser objects // connection will automatically close all of its sessions, consumers, producer and browser objects
@ -126,9 +192,6 @@ public class PagingExample {
initialContext.close(); initialContext.close();
} }
if (connection != null) {
connection.close();
}
} }
} }
} }

View File

@ -52,6 +52,16 @@ under the License.
<permission roles="guest" type="send"/> <permission roles="guest" type="send"/>
</security-setting> </security-setting>
<!--security for example pagingQueueLimited-->
<security-setting match="pagingQueueLimited">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
<security-setting match="pagingQueue"> <security-setting match="pagingQueue">
<permission roles="guest" type="createDurableQueue"/> <permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/> <permission roles="guest" type="deleteDurableQueue"/>
@ -68,6 +78,17 @@ under the License.
<page-size-bytes>20000</page-size-bytes> <page-size-bytes>20000</page-size-bytes>
</address-setting> </address-setting>
<address-setting match="pagingQueueLimited">
<!-- what to do after max-size-messages is reached. We start paging at 100 messages -->
<address-full-policy>PAGE</address-full-policy>
<!-- how soon we should enter into paging -->
<max-size-messages>100</max-size-messages>
<!-- how many messages we should accept into paging before failing. We start failing after we recorded 1000 messages on paging. -->
<page-limit-messages>1000</page-limit-messages>
<!-- what to do after page-limit is used -->
<page-full-policy>FAIL</page-full-policy>
</address-setting>
<address-setting match="exampleQueue"> <address-setting match="exampleQueue">
<max-size-bytes>10Mb</max-size-bytes> <max-size-bytes>10Mb</max-size-bytes>
<page-size-bytes>1Mb</page-size-bytes> <page-size-bytes>1Mb</page-size-bytes>
@ -84,6 +105,11 @@ under the License.
<queue name="pagingQueue"/> <queue name="pagingQueue"/>
</anycast> </anycast>
</address> </address>
<address name="pagingQueueLimited">
<anycast>
<queue name="pagingQueueLimited"/>
</anycast>
</address>
<address name="exampleQueue"> <address name="exampleQueue">
<anycast> <anycast>
<queue name="exampleQueue"/> <queue name="exampleQueue"/>

View File

@ -0,0 +1,416 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PagingLimitTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server;
@Test
public void testPageLimitMessageCoreFail() throws Exception {
testPageLimitMessage("CORE", false);
}
@Test
public void testPageLimitAMQPFail() throws Exception {
testPageLimitMessage("AMQP", false);
}
@Test
public void testPageLimitMessagesOpenWireFail() throws Exception {
testPageLimitMessage("OPENWIRE", false);
}
@Test
public void testPageLimitMessageCoreDrop() throws Exception {
testPageLimitMessage("CORE", false);
}
@Test
public void testPageLimitAMQPDrop() throws Exception {
testPageLimitMessage("AMQP", false);
}
@Test
public void testPageLimitMessagesOpenWireDrop() throws Exception {
testPageLimitMessage("OPENWIRE", false);
}
public void testPageLimitMessage(String protocol, boolean drop) throws Exception {
String queueNameTX = getName() + "_TX";
String queueNameNonTX = getName() + "_NONTX";
Configuration config = createDefaultConfig(true);
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
final int PAGE_MAX = 20 * 1024;
final int PAGE_SIZE = 10 * 1024;
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, null, 300L, drop ? "DROP" : "FAIL", null);
server.start();
server.addAddressInfo(new AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
testPageLimitMessageFailInternal(queueNameTX, protocol, true, drop);
testPageLimitMessageFailInternal(queueNameNonTX, protocol, false, drop);
}
private void testPageLimitMessageFailInternal(String queueName,
String protocol,
boolean transacted,
boolean drop) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Assert.assertNotNull(serverQueue);
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
connection.start();
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("initial " + i);
message.setIntProperty("i", i);
producer.send(message);
}
if (transacted) {
session.commit();
Assert.assertTrue(serverQueue.getPagingStore().isPaging());
}
for (int i = 0; i < 300; i++) {
if (i == 200) {
// the initial sent has to be consumed on transaction as we need a sync on the consumer for AMQP
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int initI = 0; initI < 100; initI++) {
TextMessage recMessage = (TextMessage) consumer.receive(1000);
Assert.assertEquals("initial " + initI, recMessage.getText());
}
}
if (transacted) {
session.commit();
}
Wait.assertEquals(200L, serverQueue::getMessageCount);
}
try {
TextMessage message = session.createTextMessage("hello world " + i);
message.setIntProperty("i", i);
producer.send(message);
if (i % 100 == 0) {
logger.info("sent " + i);
}
if (transacted) {
if (i % 100 == 0 && i > 0) {
session.commit();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Assert.fail("Exception happened at " + i);
}
}
if (transacted) {
session.commit();
}
try {
producer.send(session.createTextMessage("should not complete"));
if (transacted) {
session.commit();
}
if (!drop) {
Assert.fail("an Exception was expected");
}
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224120"));
} catch (JMSException e) {
logger.debug("Expected exception, ok!", e);
}
Assert.assertTrue(serverQueue.getPagingStore().isPaging());
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 150; i++) { // we will consume half of the messages
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello world " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
if (transacted) {
if (i % 100 == 0 && i > 0) {
session.commit();
}
}
}
if (transacted) {
session.commit();
}
Future<Boolean> cleanupDone = serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
Assert.assertTrue(cleanupDone.get(30, TimeUnit.SECONDS));
for (int i = 300; i < 450; i++) {
try {
TextMessage message = session.createTextMessage("hello world " + i);
message.setIntProperty("i", i);
producer.send(message);
if (i % 100 == 0) {
logger.info("sent " + i);
}
if (transacted) {
if (i % 10 == 0 && i > 0) {
session.commit();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Assert.fail("Exception happened at " + i);
}
}
if (transacted) {
session.commit();
}
AssertionLoggerHandler.clear();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
try {
producer.send(session.createTextMessage("should not complete"));
if (transacted) {
session.commit();
}
if (!drop) {
Assert.fail("an Exception was expected");
} else {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
}
} catch (JMSException e) {
logger.debug("Expected exception, ok!", e);
}
for (int i = 150; i < 450; i++) { // we will consume half of the messages
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello world " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
if (transacted) {
if (i % 100 == 0 && i > 0) {
session.commit();
}
}
}
Assert.assertNull(consumer.receiveNoWait());
}
}
@Test
public void testPageLimitBytesAMQP() throws Exception {
testPageLimitBytes("AMQP");
}
@Test
public void testPageLimitBytesCore() throws Exception {
testPageLimitBytes("CORE");
}
@Test
public void testPageLimitBytesOpenWire() throws Exception {
testPageLimitBytes("OPENWIRE");
}
public void testPageLimitBytes(String protocol) throws Exception {
String queueNameTX = getName() + "_TX";
String queueNameNonTX = getName() + "_NONTX";
Configuration config = createDefaultConfig(true);
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
final int PAGE_MAX = 20 * 1024;
final int PAGE_SIZE = 10 * 1024;
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, (long)(PAGE_MAX * 10), null, "FAIL", null);
server.start();
server.addAddressInfo(new AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null);
Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null);
testPageLimitBytesFailInternal(queueNameTX, protocol, true);
testPageLimitBytesFailInternal(queueNameNonTX, protocol, false);
}
private void testPageLimitBytesFailInternal(String queueName,
String protocol,
boolean transacted) throws Exception {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Assert.assertNotNull(serverQueue);
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
connection.start();
int successfullSends = 0;
boolean failed = false;
for (int i = 0; i < 1000; i++) {
try {
TextMessage message = session.createTextMessage("hello world " + i);
message.setIntProperty("i", i);
producer.send(message);
if (transacted) {
session.commit();
}
} catch (Exception e) {
logger.debug(e.getMessage(), e);
failed = true;
break;
}
successfullSends++;
}
Wait.assertEquals(successfullSends, serverQueue::getMessageCount);
Assert.assertTrue(failed);
int reads = successfullSends / 2;
connection.start();
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int i = 0; i < reads; i++) { // we will consume half of the messages
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello world " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
if (transacted) {
if (i % 100 == 0 && i > 0) {
session.commit();
}
}
}
if (transacted) {
session.commit();
}
}
failed = false;
int originalSuccess = successfullSends;
Future<Boolean> result = serverQueue.getPagingStore().getCursorProvider().scheduleCleanup();
Assert.assertTrue(result.get(10, TimeUnit.SECONDS));
for (int i = successfullSends; i < 1000; i++) {
try {
TextMessage message = session.createTextMessage("hello world " + i);
message.setIntProperty("i", i);
producer.send(message);
if (transacted) {
session.commit();
}
} catch (Exception e) {
logger.debug(e.getMessage(), e);
failed = true;
break;
}
successfullSends++;
}
Assert.assertTrue(failed);
Assert.assertTrue(successfullSends > originalSuccess);
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int i = reads; i < successfullSends; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello world " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
if (transacted) {
if (i % 100 == 0 && i > 0) {
session.commit();
}
}
}
if (transacted) {
session.commit();
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@ -245,6 +247,36 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
class FakePagingStore implements PagingStore { class FakePagingStore implements PagingStore {
@Override
public PageFullMessagePolicy getPageFullMessagePolicy() {
return null;
}
@Override
public Long getPageLimitMessages() {
return null;
}
@Override
public Long getPageLimitBytes() {
return null;
}
@Override
public void pageFull(PageSubscription subscription) {
}
@Override
public boolean isPageFull() {
return false;
}
@Override
public void checkPageLimit(long numberOfMessages) {
}
@Override @Override
public void counterSnapshot() { public void counterSnapshot() {
} }