From 764db34e9ba453b23b7c8b7154ab9ba710099874 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 25 Jan 2023 09:59:06 -0500 Subject: [PATCH] ARTEMIS-3178 Page Limitting (max messages and max bytes) I am adding three attributes to Address-settings: * page-limit-bytes: Number of bytes. We will convert this metric into max number of pages internally by dividing max-bytes / page-size. It will allow a max based on an estimate. * page-limit-messages: Number of messages * page-full-message-policy: fail or drop We will now allow paging, until these max values and then fail or drop messages. Once these values are retracted, the address will remain full until a period where cleanup is kicked in by paging. So these values may have a certain delay on being applied, but they should always be cleared once cleanup happened. --- .../settings/impl/PageFullMessagePolicy.java | 21 + .../artemis/core/config/impl/Validators.java | 13 + .../impl/FileConfigurationParser.java | 20 + .../artemis/core/paging/PagingStore.java | 15 + .../paging/cursor/PageCursorProvider.java | 3 +- .../cursor/impl/PageCursorProviderImpl.java | 37 +- .../impl/PageSubscriptionCounterImpl.java | 20 +- .../core/paging/impl/PagingStoreImpl.java | 143 ++++++ .../core/server/ActiveMQMessageBundle.java | 3 + .../core/server/ActiveMQServerLogger.java | 19 + .../core/settings/impl/AddressSettings.java | 110 ++++- .../schema/artemis-configuration.xsd | 35 +- .../config/impl/ConfigurationImplTest.java | 128 ++++++ .../impl/FileConfigurationParserTest.java | 19 + .../artemis/tests/util/ActiveMQTestBase.java | 35 ++ docs/user-manual/en/paging.md | 14 + .../artemis/jms/example/PagingExample.java | 207 ++++++--- .../resources/activemq/server0/broker.xml | 26 ++ .../integration/paging/PagingLimitTest.java | 416 ++++++++++++++++++ .../storage/PersistMultiThreadTest.java | 32 ++ 20 files changed, 1235 insertions(+), 81 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java new file mode 100644 index 0000000000..8da0dd38fd --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/PageFullMessagePolicy.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.settings.impl; + +public enum PageFullMessagePolicy { + DROP, FAIL +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 3aa5af5ef7..a09f75d616 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; @@ -195,6 +196,18 @@ public final class Validators { } }; + public static final Validator PAGE_FULL_MESSAGE_POLICY_TYPE = new Validator() { + @Override + public void validate(final String name, final Object value) { + String val = (String) value; + if (val == null || + !val.equals(PageFullMessagePolicy.DROP.toString()) && + !val.equals(PageFullMessagePolicy.FAIL.toString())) { + throw ActiveMQMessageBundle.BUNDLE.invalidAddressFullPolicyType(val); + } + } + }; + public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() { @Override public void validate(final String name, final Object value) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 3bdb0ca706..8461db483c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -98,6 +98,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit; @@ -218,6 +219,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy"; + private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME = "page-full-policy"; + private static final String MAX_READ_PAGE_BYTES_NODE_NAME = "max-read-page-bytes"; private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages"; @@ -226,6 +229,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size"; + private static final String PAGE_LIMIT_BYTES_NODE_NAME = "page-limit-bytes"; + + private static final String PAGE_LIMIT_MESSAGES_NODE_NAME = "page-limit-messages"; + private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit"; private static final String LVQ_NODE_NAME = "last-value-queue"; @@ -1281,6 +1288,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { ActiveMQServerLogger.LOGGER.pageMaxSizeUsed(); } addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child)); + } else if (PAGE_LIMIT_BYTES_NODE_NAME.equalsIgnoreCase(name)) { + long pageLimitBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child)); + Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_BYTES_NODE_NAME, pageLimitBytes); + addressSettings.setPageLimitBytes(pageLimitBytes); + } else if (PAGE_LIMIT_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) { + long pageLimitMessages = ByteUtil.convertTextBytes(getTrimmedTextContent(child)); + Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_MESSAGES_NODE_NAME, pageLimitMessages); + addressSettings.setPageLimitMessages(pageLimitMessages); } else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child)); } else if (ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) { @@ -1288,6 +1303,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME, value); AddressFullMessagePolicy policy = Enum.valueOf(AddressFullMessagePolicy.class, value); addressSettings.setAddressFullMessagePolicy(policy); + } else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) { + String value = getTrimmedTextContent(child); + Validators.PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME, value); + PageFullMessagePolicy policy = Enum.valueOf(PageFullMessagePolicy.class, value); + addressSettings.setPageFullMessagePolicy(policy); } else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) { addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child)); } else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index da680fce6b..d582b0d712 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.replication.ReplicationManager; @@ -30,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; @@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener AddressFullMessagePolicy getAddressFullMessagePolicy(); + PageFullMessagePolicy getPageFullMessagePolicy(); + + Long getPageLimitMessages(); + + Long getPageLimitBytes(); + + /** Callback to be used by a counter when the Page is full for that counter */ + void pageFull(PageSubscription subscription); + + boolean isPageFull(); + + void checkPageLimit(long numberOfMessages); + long getFirstPage(); int getPageSizeBytes(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index d3f25e21b5..b42049fe55 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.paging.cursor; +import java.util.concurrent.Future; import java.util.function.Consumer; import org.apache.activemq.artemis.core.filter.Filter; @@ -46,7 +47,7 @@ public interface PageCursorProvider { void flushExecutors(); - void scheduleCleanup(); + Future scheduleCleanup(); void disableCleanup(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 24d65970f8..dea4cd5fbc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -37,12 +38,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.ArtemisCloseable; +import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.LinkedList; import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class PageCursorProviderImpl implements PageCursorProvider { @@ -179,11 +182,14 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public void scheduleCleanup() { + public Future scheduleCleanup() { + final SimpleFutureImpl future = new SimpleFutureImpl<>(); if (!cleanupEnabled || scheduledCleanup.intValue() > 2) { - // Scheduled cleanup was already scheduled before.. never mind! - // or we have cleanup disabled - return; + // Scheduled cleanup was already scheduled before. + // On that case just flush the executor returning the future.set(true) + // after any previous scheduled cleanup is finished. + pagingStore.execute(() -> future.set(true)); + return future; } scheduledCleanup.incrementAndGet(); @@ -199,9 +205,12 @@ public class PageCursorProviderImpl implements PageCursorProvider { } finally { storageManager.clearContext(); scheduledCleanup.decrementAndGet(); + future.set(true); } } }); + + return future; } /** @@ -241,6 +250,22 @@ public class PageCursorProviderImpl implements PageCursorProvider { scheduleCleanup(); } + private long getNumberOfMessagesOnSubscriptions() { + AtomicLong largerCounter = new AtomicLong(); + activeCursors.forEach((id, sub) -> { + long value = sub.getCounter().getValue(); + if (value > largerCounter.get()) { + largerCounter.set(value); + } + }); + + return largerCounter.get(); + } + + void checkClearPageLimit() { + pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions()); + } + protected void cleanup() { if (!countersRebuilt) { @@ -299,6 +324,10 @@ public class PageCursorProviderImpl implements PageCursorProvider { // Then we do some check on eventual pages that can be already removed but they are away from the streaming cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage); + if (pagingStore.isPageFull()) { + checkClearPageLimit(); + } + assert pagingStore.getNumberOfPages() >= 0; if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index f580be47f5..41d0255a21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -20,6 +20,7 @@ import java.util.LinkedList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -56,6 +57,8 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { private PageSubscription subscription; + private PagingStore pagingStore; + private final StorageManager storage; private volatile long value; @@ -187,11 +190,16 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { if (logger.isTraceEnabled()) { logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size); } - valueUpdater.addAndGet(this, add); + long value = valueUpdater.addAndGet(this, add); persistentSizeUpdater.addAndGet(this, size); if (add > 0) { addedUpdater.addAndGet(this, add); addedPersistentSizeUpdater.addAndGet(this, size); + + /// we could have pagingStore null on tests, so we need to validate if pagingStore != null before anything... + if (pagingStore != null && pagingStore.getPageFullMessagePolicy() != null && !pagingStore.isPageFull()) { + checkAdd(value); + } } if (isRebuilding()) { @@ -200,6 +208,15 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { } } + private void checkAdd(long numberOfMessages) { + Long pageLimitMessages = pagingStore.getPageLimitMessages(); + if (pageLimitMessages != null) { + if (numberOfMessages >= pageLimitMessages.longValue()) { + pagingStore.pageFull(this.subscription); + } + } + } + @Override public void delete() throws Exception { Transaction tx = new TransactionImpl(storage); @@ -420,6 +437,7 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter { @Override public PageSubscriptionCounter setSubscription(PageSubscription subscription) { this.subscription = subscription; + this.pagingStore = subscription.getPagingStore(); return this; } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 81d10b9691..137d18a21b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -52,6 +53,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; @@ -99,6 +101,16 @@ public class PagingStoreImpl implements PagingStore { private long maxMessages; + private volatile boolean pageFull; + + private Long pageLimitBytes; + + private Long estimatedMaxPages; + + private Long pageLimitMessages; + + private PageFullMessagePolicy pageFullMessagePolicy; + private int pageSize; private volatile AddressFullMessagePolicy addressFullMessagePolicy; @@ -225,6 +237,40 @@ public class PagingStoreImpl implements PagingStore { addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy(); rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold(); + + pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy(); + + pageLimitBytes = addressSettings.getPageLimitBytes(); + + if (pageLimitBytes != null && pageLimitBytes.longValue() < 0) { + logger.debug("address {} had pageLimitBytes<0, setting it as null", address); + pageLimitBytes = null; + } + + pageLimitMessages = addressSettings.getPageLimitMessages(); + + if (pageLimitMessages != null && pageLimitMessages.longValue() < 0) { + logger.debug("address {} had pageLimitMessages<0, setting it as null", address); + pageLimitMessages = null; + } + + + if (pageLimitBytes == null && pageLimitMessages == null && pageFullMessagePolicy != null) { + ActiveMQServerLogger.LOGGER.noPageLimitsSet(address, pageFullMessagePolicy); + this.pageFullMessagePolicy = null; + } + + if (pageLimitBytes != null && pageLimitMessages != null && pageFullMessagePolicy == null) { + ActiveMQServerLogger.LOGGER.noPagefullPolicySet(address, pageLimitBytes, pageLimitMessages); + this.pageFullMessagePolicy = null; + this.pageLimitMessages = null; + this.pageLimitBytes = null; + } + + if (pageLimitBytes != null && pageSize > 0) { + estimatedMaxPages = pageLimitBytes / pageSize; + logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages); + } } @Override @@ -232,6 +278,79 @@ public class PagingStoreImpl implements PagingStore { return "PagingStoreImpl(" + this.address + ")"; } + @Override + public PageFullMessagePolicy getPageFullMessagePolicy() { + return pageFullMessagePolicy; + } + + @Override + public Long getPageLimitMessages() { + return pageLimitMessages; + } + + @Override + public Long getPageLimitBytes() { + return pageLimitBytes; + } + + @Override + public void pageFull(PageSubscription subscription) { + this.pageFull = true; + try { + ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue()); + } catch (Throwable e) { + // I don't think subscription would ever have a null queue. I'm being cautious here for tests + logger.warn(e.getMessage(), e); + } + } + + @Override + public boolean isPageFull() { + return pageFull; + } + + private boolean isBelowPageLimitBytes() { + if (estimatedMaxPages != null) { + return (numberOfPages <= estimatedMaxPages.longValue()); + } else { + return true; + } + } + + private void checkNumberOfPages() { + if (!isBelowPageLimitBytes()) { + this.pageFull = true; + ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize); + } + } + + @Override + public void checkPageLimit(long numberOfMessages) { + boolean pageMessageMessagesClear = true; + Long pageLimitMessages = getPageLimitMessages(); + + if (pageLimitMessages != null) { + if (logger.isDebugEnabled()) { // gate to avoid boxing of numberOfMessages + logger.debug("Address {} has {} messages on the larger queue", storeName, numberOfMessages); + } + + pageMessageMessagesClear = (numberOfMessages < pageLimitMessages.longValue()); + } + + boolean pageMessageBytesClear = isBelowPageLimitBytes(); + + if (pageMessageBytesClear && pageMessageMessagesClear) { + pageLimitReleased(); + } + } + + private void pageLimitReleased() { + if (pageFull) { + ActiveMQServerLogger.LOGGER.pageFree(getAddress()); + this.pageFull = false; + } + } + @Override public boolean lock(long timeout) { if (timeout == -1) { @@ -480,6 +599,8 @@ public class PagingStoreImpl implements PagingStore { numberOfPages = files.size(); + checkNumberOfPages(); + for (String fileName : files) { final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName); @@ -556,6 +677,7 @@ public class PagingStoreImpl implements PagingStore { if (isPaging) { paging = false; ActiveMQServerLogger.LOGGER.pageStoreStop(storeName, getPageInfo()); + pageLimitReleased(); } this.cursorProvider.onPageModeCleared(); } finally { @@ -1029,6 +1151,25 @@ public class PagingStoreImpl implements PagingStore { lock.readLock().unlock(); } + if (pageFull) { + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } + + if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) { + throw ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString()); + } + + if (!printedDropMessagesWarning) { + printedDropMessagesWarning = true; + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, getPageInfo()); + } + + // we are in page mode, if we got to this point, we are dropping the message while still paging + // this needs to return true as it is paging + return true; + } + return writePage(message, tx, listCtx); } @@ -1282,6 +1423,8 @@ public class PagingStoreImpl implements PagingStore { try { numberOfPages++; + checkNumberOfPages(); + final long newPageId = currentPageId + 1; if (logger.isTraceEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 2c2f632d93..05625fd3af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -527,4 +527,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229245, value = "Management controller is busy with another task. Please try again") ActiveMQTimeoutException managementBusy(); + + @Message(id = 229246, value = "Invalid page full message policy type {}") + IllegalArgumentException invalidPageFullPolicyType(String val); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index eaec474036..03426d92b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1563,4 +1563,23 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224119, value = "Unable to refresh security settings: {}", level = LogMessage.Level.WARN) void unableToRefreshSecuritySettings(String exceptionMessage); + + @LogMessage(id = 224120, value = "Queue {} on Address {} has more messages than configured page limit. PageLimitMesages={} while currentValue={}", level = LogMessage.Level.WARN) + void pageFull(SimpleString queue, SimpleString address, Object pageLImitMessage, Object currentValue); + + @LogMessage(id = 224121, value = "Queue {} on Address {} is out of page limit now. We will issue a cleanup to check other queues.", level = LogMessage.Level.WARN) + void pageFree(SimpleString queue, SimpleString address); + + @LogMessage(id = 224122, value = "Address {} number of messages is under page limit again, and it should be allowed to page again.", level = LogMessage.Level.INFO) + void pageFree(SimpleString address); + + @LogMessage(id = 224123, value = "Address {} has more pages than allowed. System currently has {} pages, while the estimated max number of pages is {}, based on the limitPageBytes ({}) / page-size ({})", level = LogMessage.Level.WARN) + void pageFullMaxBytes(SimpleString address, long pages, long maxPages, long limitBytes, long bytes); + + @LogMessage(id = 224124, value = "Address {} has a pageFullPolicy set as {} but there are not page-limit-bytes or page-limit-messages set. Page full configuration being ignored on this address.", level = LogMessage.Level.WARN) + void noPageLimitsSet(Object address, Object policy); + + @LogMessage(id = 224125, value = "Address {} has page-limit-bytes={}, page-limit-messages={} and no page-full-policy set. Page full configuration being ignored on this address", level = LogMessage.Level.WARN) + void noPagefullPolicySet(Object address, Object limitBytes, Object limitMessages); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 01ede20416..c06ff5bcf0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -147,6 +147,12 @@ public class AddressSettings implements Mergeable, Serializable private Integer maxReadPageMessages = null; + private Long pageLimitBytes = null; + + private Long pageLimitMessages = null; + + private PageFullMessagePolicy pageFullMessagePolicy = null; + private Long maxSizeMessages = null; private Integer pageSizeBytes = null; @@ -289,6 +295,9 @@ public class AddressSettings implements Mergeable, Serializable this.maxSizeMessages = other.maxSizeMessages; this.maxReadPageMessages = other.maxReadPageMessages; this.maxReadPageBytes = other.maxReadPageBytes; + this.pageLimitBytes = other.pageLimitBytes; + this.pageLimitMessages = other.pageLimitMessages; + this.pageFullMessagePolicy = other.pageFullMessagePolicy; this.pageSizeBytes = other.pageSizeBytes; this.pageMaxCache = other.pageMaxCache; this.dropMessagesWhenFull = other.dropMessagesWhenFull; @@ -644,6 +653,33 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public Long getPageLimitBytes() { + return pageLimitBytes; + } + + public AddressSettings setPageLimitBytes(Long pageLimitBytes) { + this.pageLimitBytes = pageLimitBytes; + return this; + } + + public Long getPageLimitMessages() { + return pageLimitMessages; + } + + public AddressSettings setPageLimitMessages(Long pageLimitMessages) { + this.pageLimitMessages = pageLimitMessages; + return this; + } + + public PageFullMessagePolicy getPageFullMessagePolicy() { + return this.pageFullMessagePolicy; + } + + public AddressSettings setPageFullMessagePolicy(PageFullMessagePolicy policy) { + this.pageFullMessagePolicy = policy; + return this; + } + public int getMaxReadPageBytes() { return maxReadPageBytes != null ? maxReadPageBytes : 2 * getPageSizeBytes(); } @@ -1223,6 +1259,15 @@ public class AddressSettings implements Mergeable, Serializable if (enableIngressTimestamp == null) { enableIngressTimestamp = merged.enableIngressTimestamp; } + if (pageFullMessagePolicy == null) { + pageFullMessagePolicy = merged.pageFullMessagePolicy; + } + if (pageLimitBytes == null) { + pageLimitBytes = merged.pageLimitBytes; + } + if (pageLimitMessages == null) { + pageLimitMessages = merged.pageLimitMessages; + } } @Override @@ -1472,6 +1517,24 @@ public class AddressSettings implements Mergeable, Serializable if (buffer.readableBytes() > 0) { maxReadPageMessages = BufferHelper.readNullableInteger(buffer); } + + if (buffer.readableBytes() > 0) { + pageLimitBytes = BufferHelper.readNullableLong(buffer); + } + + if (buffer.readableBytes() > 0) { + pageLimitMessages = BufferHelper.readNullableLong(buffer); + } + + if (buffer.readableBytes() > 0) { + policyStr = buffer.readNullableSimpleString(); + + if (policyStr != null) { + pageFullMessagePolicy = PageFullMessagePolicy.valueOf(policyStr.toString()); + } else { + pageFullMessagePolicy = null; + } + } } @Override @@ -1542,7 +1605,10 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) + BufferHelper.sizeOfNullableLong(maxSizeMessages) + BufferHelper.sizeOfNullableInteger(maxReadPageMessages) + - BufferHelper.sizeOfNullableInteger(maxReadPageBytes); + BufferHelper.sizeOfNullableInteger(maxReadPageBytes) + + BufferHelper.sizeOfNullableLong(pageLimitBytes) + + BufferHelper.sizeOfNullableLong(pageLimitMessages) + + BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null); } @Override @@ -1682,6 +1748,13 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableInteger(buffer, maxReadPageBytes); BufferHelper.writeNullableInteger(buffer, maxReadPageMessages); + + BufferHelper.writeNullableLong(buffer, pageLimitBytes); + + BufferHelper.writeNullableLong(buffer, pageLimitMessages); + + buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new SimpleString(pageFullMessagePolicy.toString()) : null); + } /* (non-Javadoc) @@ -1758,6 +1831,10 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode()); result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode()); result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode()); + result = prime * result + ((pageLimitBytes == null) ? 0 : pageLimitBytes.hashCode()); + result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode()); + result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode()); + return result; } @@ -2130,6 +2207,31 @@ public class AddressSettings implements Mergeable, Serializable } else if (!maxSizeMessages.equals(other.maxSizeMessages)) return false; + if (pageLimitBytes == null) { + if (other.pageLimitBytes != null) { + return false; + } + } else if (!pageLimitBytes.equals(other.pageLimitBytes)) { + return false; + } + + if (pageLimitMessages == null) { + if (other.pageLimitMessages != null) { + return false; + } + } else if (!pageLimitMessages.equals(other.pageLimitMessages)) { + return false; + } + + if (pageFullMessagePolicy == null) { + if (other.pageFullMessagePolicy != null) { + return false; + } + } else if (!pageFullMessagePolicy.equals(other.pageFullMessagePolicy)) { + return false; + } + + return true; } @@ -2267,6 +2369,12 @@ public class AddressSettings implements Mergeable, Serializable enableMetrics + ", enableIngressTime=" + enableIngressTimestamp + + ", pageLimitBytes=" + + pageLimitBytes + + ", pageLimitMessages=" + + pageLimitMessages + + ", pageFullMessagePolicy=" + + pageFullMessagePolicy + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index fdb30c7c24..a6953148c1 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3757,8 +3757,25 @@ + + + + After the address enters into page mode, this attribute will configure how many pages can be written into page before activating the page-full-policy. + Supports byte notation like "K", "Mb", "MiB", "GB", etc. + + + + + + + + After the address enters into page mode, this attribute will configure how many messages can be written into page before activating the page-full-policy. + + + + + minOccurs="0"> used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before @@ -3819,6 +3836,20 @@ + + + + After entering page mode, a second limit will be set by page-limit-bytes and/or page-limit-messages. The page-full-policy will configure what to do when that limit is reached. + + + + + + + + + + @@ -4315,7 +4346,7 @@ - + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 581cc78ea8..1f846cc1eb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -30,6 +30,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -50,6 +51,12 @@ import org.apache.activemq.artemis.core.config.federation.FederationPolicySet; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; @@ -58,10 +65,12 @@ import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServer import org.apache.activemq.artemis.core.server.routing.KeyType; import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.commons.lang3.ClassUtils; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -1004,6 +1013,125 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertEquals(SimpleString.toSimpleString("moreImportant"), configuration.getAddressSettings().get("Name.With.Dots").getExpiryAddress()); } + @Test + public void testAddressSettingsPageLimit() throws Throwable { + ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties properties = new Properties(); + + String randomString = RandomUtil.randomString(); + + properties.put("addressSettings.#.expiryAddress", randomString); + properties.put("addressSettings.#.pageLimitMessages", "300"); + properties.put("addressSettings.#.pageLimitBytes", "300000"); + properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); + + configuration.parsePrefixedProperties(properties, null); + + Assert.assertEquals(1, configuration.getAddressSettings().size()); + Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress()); + Assert.assertEquals(300L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue()); + Assert.assertEquals(300000L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue()); + Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString()); + + PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true); + + Assert.assertEquals(300L, storeImpl.getPageLimitMessages().longValue()); + Assert.assertEquals(300000L, storeImpl.getPageLimitBytes().longValue()); + Assert.assertEquals("DROP", storeImpl.getPageFullMessagePolicy().toString()); + } + + @Test + public void testAddressSettingsPageLimitInvalidConfiguration1() throws Throwable { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties properties = new Properties(); + + String randomString = RandomUtil.randomString(); + + properties.put("addressSettings.#.expiryAddress", randomString); + properties.put("addressSettings.#.pageLimitMessages", "300"); + properties.put("addressSettings.#.pageLimitBytes", "300000"); + //properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // removing the pageFull on purpose + + configuration.parsePrefixedProperties(properties, null); + + Assert.assertEquals(1, configuration.getAddressSettings().size()); + Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress()); + Assert.assertEquals(300L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue()); + Assert.assertEquals(300000L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue()); + Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageFullMessagePolicy()); + + PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125")); + + Assert.assertEquals(null, storeImpl.getPageLimitMessages()); + Assert.assertEquals(null, storeImpl.getPageLimitBytes()); + Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy()); + } + + @Test + public void testAddressSettingsPageLimitInvalidConfiguration2() throws Throwable { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties properties = new Properties(); + + String randomString = RandomUtil.randomString(); + + properties.put("addressSettings.#.expiryAddress", randomString); + //properties.put("addressSettings.#.pageLimitMessages", "300"); // removing this on purpose + //properties.put("addressSettings.#.pageLimitBytes", "300000"); // removing this on purpose + properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // keeping this on purpose + + configuration.parsePrefixedProperties(properties, null); + + Assert.assertEquals(1, configuration.getAddressSettings().size()); + Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress()); + Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageLimitMessages()); + Assert.assertEquals(null, configuration.getAddressSettings().get("#").getPageLimitBytes()); + Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString()); + + PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224124")); + + Assert.assertEquals(null, storeImpl.getPageLimitMessages()); + Assert.assertEquals(null, storeImpl.getPageLimitBytes()); + Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy()); + } + + + @Test + public void testAddressSettingsPageLimitInvalidConfiguration3() throws Throwable { + ConfigurationImpl configuration = new ConfigurationImpl(); + + Properties properties = new Properties(); + + String randomString = RandomUtil.randomString(); + + properties.put("addressSettings.#.expiryAddress", randomString); + properties.put("addressSettings.#.pageLimitMessages", "-1"); // -1 on purpose, to make it null on final parsing + properties.put("addressSettings.#.pageLimitBytes", "-1"); // -1 on purpose, to make it null on final parsing + properties.put("addressSettings.#.pageFullMessagePolicy", "DROP"); // keeping this on purpose + + configuration.parsePrefixedProperties(properties, null); + + Assert.assertEquals(1, configuration.getAddressSettings().size()); + Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress()); + Assert.assertEquals(-1L, configuration.getAddressSettings().get("#").getPageLimitMessages().longValue()); + Assert.assertEquals(-1L, configuration.getAddressSettings().get("#").getPageLimitBytes().longValue()); + Assert.assertEquals("DROP", configuration.getAddressSettings().get("#").getPageFullMessagePolicy().toString()); + + PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true); + + Assert.assertEquals(null, storeImpl.getPageLimitMessages()); + Assert.assertEquals(null, storeImpl.getPageLimitBytes()); + Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy()); + } + @Test public void testDivertViaProperties() throws Exception { ConfigurationImpl configuration = new ConfigurationImpl(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 6cf7fda451..1cf36fca22 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -393,6 +393,25 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { AddressSettings settings = configuration.getAddressSettings().get("foo"); Assert.assertEquals(1024, settings.getMaxReadPageBytes()); Assert.assertEquals(33, settings.getMaxReadPageMessages()); + Assert.assertNull(settings.getPageLimitBytes()); + Assert.assertNull(settings.getPageLimitMessages()); + Assert.assertNull(settings.getPageFullMessagePolicy()); + } + + @Test + public void testParsePageLimitSettings() throws Exception { + String configStr = "" + "\n" + "" + "\n" + "1k2k337FAIL33.\n" + "" + "\n" + "" + "\n"; + + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + Configuration configuration = parser.parseMainConfig(input); + AddressSettings settings = configuration.getAddressSettings().get("foo"); + Assert.assertEquals(1024, settings.getMaxReadPageBytes()); + Assert.assertEquals(33, settings.getMaxReadPageMessages()); + Assert.assertEquals(2048L, settings.getPageLimitBytes().longValue()); + Assert.assertEquals(337L, settings.getPageLimitMessages().longValue()); + Assert.assertEquals("FAIL", settings.getPageFullMessagePolicy().toString()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index db80b028ab..3a19d098dc 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -134,6 +134,7 @@ import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; @@ -1505,12 +1506,28 @@ public abstract class ActiveMQTestBase extends Assert { return createServer(realFiles, configuration, pageSize, maxAddressSize, null, null, settings); } + + + protected final ActiveMQServer createServer(final boolean realFiles, + final Configuration configuration, + final int pageSize, + final long maxAddressSize, + final Integer maxReadPageMessages, + final Integer maxReadPageBytes, + final Map settings) { + return createServer(realFiles, configuration, pageSize, maxAddressSize, maxReadPageMessages, maxReadPageBytes, null, null, null, settings); + + } + protected final ActiveMQServer createServer(final boolean realFiles, final Configuration configuration, final int pageSize, final long maxAddressSize, final Integer maxReadPageMessages, final Integer maxReadPageBytes, + final Long pageLimitBytes, + final Long pageLimitMessages, + final String pageLimitPolicy, final Map settings) { ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles)); @@ -1522,6 +1539,15 @@ public abstract class ActiveMQTestBase extends Assert { if (maxReadPageMessages != null) { setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue()); } + if (pageLimitBytes != null) { + setting.getValue().setPageLimitBytes(pageLimitBytes); + } + if (pageLimitMessages != null) { + setting.getValue().setPageLimitMessages(pageLimitMessages); + } + if (pageLimitPolicy != null) { + setting.getValue().setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy)); + } server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue()); } } @@ -1533,6 +1559,15 @@ public abstract class ActiveMQTestBase extends Assert { if (maxReadPageMessages != null) { defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue()); } + if (pageLimitBytes != null) { + defaultSetting.setPageLimitBytes(pageLimitBytes); + } + if (pageLimitMessages != null) { + defaultSetting.setPageLimitMessages(pageLimitMessages); + } + if (pageLimitPolicy != null) { + defaultSetting.setPageFullMessagePolicy(PageFullMessagePolicy.valueOf(pageLimitPolicy)); + } server.getAddressSettingsRepository().addMatch("#", defaultSetting); diff --git a/docs/user-manual/en/paging.md b/docs/user-manual/en/paging.md index 8b33e26958..b1fe0dcc38 100644 --- a/docs/user-manual/en/paging.md +++ b/docs/user-manual/en/paging.md @@ -74,6 +74,9 @@ Configuration is done at the address settings in `broker.xml`. 1000 10485760 PAGE + 10G + 1000000 + FAIL ``` @@ -98,6 +101,9 @@ Property Name|Description|Default `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` `max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. | -1 `max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. | 2 * page-size-bytes +`page-limit-bytes` | After entering page mode, how much data would the system allow incoming. Notice this will be internally converted as number of pages. | +`page-limit-messages` | After entering page mode, how many messages would the system allow incoming on paging. | +`page-full-policy` | Valid results are DROP or FAIL. This tells what to do if the system is reaching `page-limit-bytes` or `page-limit-messages` after paging | ### max-size-bytes and max-size-messages simultaneous usage @@ -205,6 +211,14 @@ The system should keep at least one paged file in memory caching ahead reading m Also every active subscription could keep one paged file in memory. So, if your system has too many queues it is recommended to minimize the page-size. +## Page Limits and Page Full Policy + +Since version `2.28.0` is possible to configure limits on how much data is paged. This is to avoid a single destination using the entire disk in case their consumers are gone. + +You can configure either `page-limit-bytes` or `page-limit-messages`, along with `page-full-policy` on the address settings limiting how much data will be recorded in paging. + +If you configure `page-full-policy` as DROP, messages will be simplify dropped while the clients will not get any exceptions, while if you configured FAIL the producers will receive a JMS Exception for the error condition. + ## Example See the [Paging Example](examples.md#paging) which shows how to use paging with diff --git a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java index 165fae2739..ad97020b94 100644 --- a/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java +++ b/examples/features/standard/paging/src/main/java/org/apache/activemq/artemis/jms/example/PagingExample.java @@ -20,6 +20,7 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -32,91 +33,56 @@ import javax.naming.InitialContext; public class PagingExample { public static void main(final String[] args) throws Exception { - Connection connection = null; + // simple routing showing how paging should work + simplePaging(); + // simple routing showing what happens when paging enters into page-full + pageFullLimit(); + } + + public static void pageFullLimit() throws Exception { InitialContext initialContext = null; try { - // Step 1. Create an initial context to perform the JNDI lookup. + // Create an initial context to perform the JNDI lookup. initialContext = new InitialContext(); - // Step 2. Perform a lookup on the Connection Factory + // Perform a lookup on the Connection Factory ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory"); - // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number - // of bytes in memory - Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue"); + // Create a JMS Connection + try (Connection connection = cf.createConnection()) { - // Step 4. Lookup for a JMS Queue - Queue queue = (Queue) initialContext.lookup("queue/exampleQueue"); + // Create a JMS Session + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - // Step 5. Create a JMS Connection - connection = cf.createConnection(); + // lookup the queue + Queue queue = session.createQueue("pagingQueueLimited"); - // Step 6. Create a JMS Session - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + // Create a JMS Message Producer for pageQueueAddress + MessageProducer pageMessageProducer = session.createProducer(queue); - // Step 7. Create a JMS Message Producer for pageQueueAddress - MessageProducer pageMessageProducer = session.createProducer(pageQueue); + // We don't need persistent messages in order to use paging. (This step is optional) + pageMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - // Step 8. We don't need persistent messages in order to use paging. (This step is optional) - pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // Create a Binary Bytes Message with 10K arbitrary bytes + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10 * 1024]); - // Step 9. Create a Binary Bytes Message with 10K arbitrary bytes - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[10 * 1024]); + try { + // Send messages to the queue until the address is full + for (int i = 0; i < 2000; i++) { + pageMessageProducer.send(message); + if (i > 0 && i % 100 == 0) { + // batch commit on the sends + session.commit(); + } + } - // Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at - // ./paging/config/activemq-queues.xml for the config. - for (int i = 0; i < 20; i++) { - pageMessageProducer.send(message); - } - - // Step 11. Create a JMS Message Producer - MessageProducer messageProducer = session.createProducer(queue); - - // Step 12. We don't need persistent messages in order to use paging. (This step is optional) - messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - // Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server - for (int i = 0; i < 1000; i++) { - messageProducer.send(message); - } - - // Step 14. if you pause this example here, you will see several files under ./build/data/paging - // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created - // files just for - - // Step 15. Create a JMS Message Consumer - MessageConsumer messageConsumer = session.createConsumer(queue); - - // Step 16. Start the JMS Connection. This step will activate the subscribers to receive messages. - connection.start(); - - // Step 17. Receive the messages. It's important to ACK for messages as ActiveMQ Artemis will not read messages from - // paging - // until messages are ACKed - - for (int i = 0; i < 1000; i++) { - message = (BytesMessage) messageConsumer.receive(3000); - - if (i % 100 == 0) { - System.out.println("Received " + i + " messages"); - message.acknowledge(); + throw new RuntimeException("Example was supposed to get a page full exception. Check your example configuration or report a bug"); + } catch (JMSException e) { + System.out.println("The producer has thrown an expected exception " + e); } - } - - message.acknowledge(); - - // Step 18. Receive the messages from the Queue names pageQueue. Create the proper consumer for that - messageConsumer.close(); - messageConsumer = session.createConsumer(pageQueue); - - for (int i = 0; i < 20; i++) { - message = (BytesMessage) messageConsumer.receive(1000); - - System.out.println("Received message " + i + " from pageQueue"); - - message.acknowledge(); + session.commit(); } } finally { // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS @@ -126,9 +92,106 @@ public class PagingExample { initialContext.close(); } - if (connection != null) { - connection.close(); + } + } + + + public static void simplePaging() throws Exception { + + InitialContext initialContext = null; + try { + // Create an initial context to perform the JNDI lookup. + initialContext = new InitialContext(); + + // Perform a lookup on the Connection Factory + ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory"); + + // We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number + // of bytes in memory + Queue pageQueue = (Queue) initialContext.lookup("queue/pagingQueue"); + + // Lookup for a JMS Queue + Queue queue = (Queue) initialContext.lookup("queue/exampleQueue"); + + // Create a JMS Connection + try (Connection connection = cf.createConnection()) { + + // Create a JMS Session + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Create a JMS Message Producer for pageQueueAddress + MessageProducer pageMessageProducer = session.createProducer(pageQueue); + + // We don't need persistent messages in order to use paging. (This step is optional) + pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Create a Binary Bytes Message with 10K arbitrary bytes + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10 * 1024]); + + // Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at + // ./paging/config/activemq-queues.xml for the config. + for (int i = 0; i < 20; i++) { + pageMessageProducer.send(message); + } + + // Create a JMS Message Producer + MessageProducer messageProducer = session.createProducer(queue); + + // We don't need persistent messages in order to use paging. (This step is optional) + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Send the message for about 1K, which should be over the memory limit imposed by the server + for (int i = 0; i < 1000; i++) { + messageProducer.send(message); + } + + // if you pause this example here, you will see several files under ./build/data/paging + // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created + // files just for + + // Create a JMS Message Consumer + MessageConsumer messageConsumer = session.createConsumer(queue); + + // Start the JMS Connection. This step will activate the subscribers to receive messages. + connection.start(); + + // Receive the messages. It's important to ACK for messages as ActiveMQ Artemis will not read messages from + // paging + // until messages are ACKed + + for (int i = 0; i < 1000; i++) { + message = (BytesMessage) messageConsumer.receive(3000); + + if (i % 100 == 0) { + System.out.println("Received " + i + " messages"); + message.acknowledge(); + } + } + + message.acknowledge(); + + // Receive the messages from the Queue names pageQueue. Create the proper consumer for that + messageConsumer.close(); + messageConsumer = session.createConsumer(pageQueue); + + for (int i = 0; i < 20; i++) { + message = (BytesMessage) messageConsumer.receive(1000); + + System.out.println("Received message " + i + " from pageQueue"); + + message.acknowledge(); + } } + + } finally { + // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS + // connection will automatically close all of its sessions, consumers, producer and browser objects + + if (initialContext != null) { + initialContext.close(); + } + } } } diff --git a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml index 9144c901f2..a4dfcd3d0e 100644 --- a/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/paging/src/main/resources/activemq/server0/broker.xml @@ -52,6 +52,16 @@ under the License. + + + + + + + + + + @@ -68,6 +78,17 @@ under the License. 20000 + + + PAGE + + 100 + + 1000 + + FAIL + + 10Mb 1Mb @@ -84,6 +105,11 @@ under the License. +
+ + + +
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java new file mode 100644 index 0000000000..a3e1bf04d1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingLimitTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PagingLimitTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ActiveMQServer server; + + @Test + public void testPageLimitMessageCoreFail() throws Exception { + testPageLimitMessage("CORE", false); + } + + @Test + public void testPageLimitAMQPFail() throws Exception { + testPageLimitMessage("AMQP", false); + } + + @Test + public void testPageLimitMessagesOpenWireFail() throws Exception { + testPageLimitMessage("OPENWIRE", false); + } + + @Test + public void testPageLimitMessageCoreDrop() throws Exception { + testPageLimitMessage("CORE", false); + } + + @Test + public void testPageLimitAMQPDrop() throws Exception { + testPageLimitMessage("AMQP", false); + } + + @Test + public void testPageLimitMessagesOpenWireDrop() throws Exception { + testPageLimitMessage("OPENWIRE", false); + } + + public void testPageLimitMessage(String protocol, boolean drop) throws Exception { + + String queueNameTX = getName() + "_TX"; + String queueNameNonTX = getName() + "_NONTX"; + + Configuration config = createDefaultConfig(true); + config.setJournalSyncTransactional(false).setJournalSyncTransactional(false); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, null, 300L, drop ? "DROP" : "FAIL", null); + server.start(); + + server.addAddressInfo(new AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST)); + server.addAddressInfo(new AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null); + Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null); + + testPageLimitMessageFailInternal(queueNameTX, protocol, true, drop); + testPageLimitMessageFailInternal(queueNameNonTX, protocol, false, drop); + + } + + private void testPageLimitMessageFailInternal(String queueName, + String protocol, + boolean transacted, + boolean drop) throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName); + Assert.assertNotNull(serverQueue); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + connection.start(); + + for (int i = 0; i < 100; i++) { + TextMessage message = session.createTextMessage("initial " + i); + message.setIntProperty("i", i); + producer.send(message); + } + if (transacted) { + session.commit(); + Assert.assertTrue(serverQueue.getPagingStore().isPaging()); + } + + for (int i = 0; i < 300; i++) { + if (i == 200) { + // the initial sent has to be consumed on transaction as we need a sync on the consumer for AMQP + try (MessageConsumer consumer = session.createConsumer(queue)) { + for (int initI = 0; initI < 100; initI++) { + TextMessage recMessage = (TextMessage) consumer.receive(1000); + Assert.assertEquals("initial " + initI, recMessage.getText()); + } + } + if (transacted) { + session.commit(); + } + Wait.assertEquals(200L, serverQueue::getMessageCount); + } + + try { + TextMessage message = session.createTextMessage("hello world " + i); + message.setIntProperty("i", i); + producer.send(message); + if (i % 100 == 0) { + logger.info("sent " + i); + } + if (transacted) { + if (i % 100 == 0 && i > 0) { + session.commit(); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + Assert.fail("Exception happened at " + i); + } + } + if (transacted) { + session.commit(); + } + + try { + producer.send(session.createTextMessage("should not complete")); + if (transacted) { + session.commit(); + } + if (!drop) { + Assert.fail("an Exception was expected"); + } + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224120")); + } catch (JMSException e) { + logger.debug("Expected exception, ok!", e); + } + + + Assert.assertTrue(serverQueue.getPagingStore().isPaging()); + + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 150; i++) { // we will consume half of the messages + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello world " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + if (transacted) { + if (i % 100 == 0 && i > 0) { + session.commit(); + } + } + } + if (transacted) { + session.commit(); + } + Future cleanupDone = serverQueue.getPagingStore().getCursorProvider().scheduleCleanup(); + + Assert.assertTrue(cleanupDone.get(30, TimeUnit.SECONDS)); + + + + for (int i = 300; i < 450; i++) { + try { + TextMessage message = session.createTextMessage("hello world " + i); + message.setIntProperty("i", i); + producer.send(message); + if (i % 100 == 0) { + logger.info("sent " + i); + } + if (transacted) { + if (i % 10 == 0 && i > 0) { + session.commit(); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + Assert.fail("Exception happened at " + i); + } + } + if (transacted) { + session.commit(); + } + AssertionLoggerHandler.clear(); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120")); + + + try { + producer.send(session.createTextMessage("should not complete")); + if (transacted) { + session.commit(); + } + if (!drop) { + Assert.fail("an Exception was expected"); + } else { + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120")); + } + } catch (JMSException e) { + logger.debug("Expected exception, ok!", e); + } + + for (int i = 150; i < 450; i++) { // we will consume half of the messages + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello world " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + if (transacted) { + if (i % 100 == 0 && i > 0) { + session.commit(); + } + } + } + + Assert.assertNull(consumer.receiveNoWait()); + } + + } + + + @Test + public void testPageLimitBytesAMQP() throws Exception { + testPageLimitBytes("AMQP"); + } + + @Test + public void testPageLimitBytesCore() throws Exception { + testPageLimitBytes("CORE"); + } + + @Test + public void testPageLimitBytesOpenWire() throws Exception { + testPageLimitBytes("OPENWIRE"); + } + + public void testPageLimitBytes(String protocol) throws Exception { + + String queueNameTX = getName() + "_TX"; + String queueNameNonTX = getName() + "_NONTX"; + + Configuration config = createDefaultConfig(true); + config.setJournalSyncTransactional(false).setJournalSyncTransactional(false); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, (long)(PAGE_MAX * 10), null, "FAIL", null); + server.start(); + + server.addAddressInfo(new AddressInfo(queueNameTX).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queueNameTX).setRoutingType(RoutingType.ANYCAST)); + server.addAddressInfo(new AddressInfo(queueNameNonTX).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queueNameNonTX).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queueNameNonTX) != null); + Wait.assertTrue(() -> server.locateQueue(queueNameTX) != null); + + testPageLimitBytesFailInternal(queueNameTX, protocol, true); + testPageLimitBytesFailInternal(queueNameNonTX, protocol, false); + + } + + + + private void testPageLimitBytesFailInternal(String queueName, + String protocol, + boolean transacted) throws Exception { + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName); + Assert.assertNotNull(serverQueue); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + connection.start(); + + int successfullSends = 0; + boolean failed = false; + + for (int i = 0; i < 1000; i++) { + try { + TextMessage message = session.createTextMessage("hello world " + i); + message.setIntProperty("i", i); + producer.send(message); + if (transacted) { + session.commit(); + } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + failed = true; + break; + } + successfullSends++; + } + + Wait.assertEquals(successfullSends, serverQueue::getMessageCount); + Assert.assertTrue(failed); + + int reads = successfullSends / 2; + + connection.start(); + try (MessageConsumer consumer = session.createConsumer(queue)) { + for (int i = 0; i < reads; i++) { // we will consume half of the messages + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello world " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + if (transacted) { + if (i % 100 == 0 && i > 0) { + session.commit(); + } + } + } + if (transacted) { + session.commit(); + } + } + + failed = false; + + int originalSuccess = successfullSends; + + Future result = serverQueue.getPagingStore().getCursorProvider().scheduleCleanup(); + Assert.assertTrue(result.get(10, TimeUnit.SECONDS)); + + for (int i = successfullSends; i < 1000; i++) { + try { + TextMessage message = session.createTextMessage("hello world " + i); + message.setIntProperty("i", i); + producer.send(message); + if (transacted) { + session.commit(); + } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + failed = true; + break; + } + successfullSends++; + } + + Assert.assertTrue(failed); + Assert.assertTrue(successfullSends > originalSuccess); + + try (MessageConsumer consumer = session.createConsumer(queue)) { + for (int i = reads; i < successfullSends; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello world " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + if (transacted) { + if (i % 100 == 0 && i > 0) { + session.commit(); + } + } + } + if (transacted) { + session.commit(); + } + Assert.assertNull(consumer.receiveNoWait()); + } + + + } + + } + + + +} \ No newline at end of file diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 5297e6dcf0..4317ceaea1 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; @@ -245,6 +247,36 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { class FakePagingStore implements PagingStore { + @Override + public PageFullMessagePolicy getPageFullMessagePolicy() { + return null; + } + + @Override + public Long getPageLimitMessages() { + return null; + } + + @Override + public Long getPageLimitBytes() { + return null; + } + + @Override + public void pageFull(PageSubscription subscription) { + + } + + @Override + public boolean isPageFull() { + return false; + } + + @Override + public void checkPageLimit(long numberOfMessages) { + + } + @Override public void counterSnapshot() { }