ARTEMIS-4973 pageSizeBytes/pageLimitBytes combination can cause Address full

Update docs/user-manual/paging.adoc

Co-authored-by: Robbie Gemmell <robbie@apache.org>
This commit is contained in:
Howard Gao 2024-08-05 20:44:29 +08:00 committed by clebertsuconic
parent 583af58e43
commit 1f79341c05
7 changed files with 493 additions and 13 deletions

View File

@ -63,6 +63,8 @@ public interface PageCursorProvider {
*/
void close(PageSubscription pageCursorImpl);
void checkClearPageLimit();
void counterRebuildStarted();
void counterRebuildDone();

View File

@ -247,7 +247,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
scheduleCleanup();
}
private long getNumberOfMessagesOnSubscriptions() {
protected long getNumberOfMessagesOnSubscriptions() {
AtomicLong largerCounter = new AtomicLong();
activeCursors.forEach((id, sub) -> {
long value = sub.getCounter().getValue();
@ -259,7 +259,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return largerCounter.get();
}
void checkClearPageLimit() {
@Override
public void checkClearPageLimit() {
pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
}

View File

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -192,7 +193,7 @@ public class PagingStoreImpl implements PagingStore {
setUnderCallback(this::underSized).setOverCallback(this::overSized).
setOnSizeCallback(pagingManager::addSize);
applySetting(addressSettings);
applySetting(addressSettings, true);
this.executor = executor;
@ -233,6 +234,10 @@ public class PagingStoreImpl implements PagingStore {
*/
@Override
public void applySetting(final AddressSettings addressSettings) {
applySetting(addressSettings, false);
}
private void applySetting(final AddressSettings addressSettings, final boolean firstTime) {
maxSize = addressSettings.getMaxSizeBytes();
maxPageReadMessages = addressSettings.getMaxReadPageMessages();
@ -263,14 +268,15 @@ public class PagingStoreImpl implements PagingStore {
pageLimitBytes = addressSettings.getPageLimitBytes();
if (pageLimitBytes != null && pageLimitBytes.longValue() < 0) {
if (pageLimitBytes != null && pageLimitBytes < 0) {
logger.debug("address {} had pageLimitBytes<0, setting it as null", address);
pageLimitBytes = null;
}
pageLimitMessages = addressSettings.getPageLimitMessages();
Long originalLimitMessages = this.pageLimitMessages;
this.pageLimitMessages = addressSettings.getPageLimitMessages();
if (pageLimitMessages != null && pageLimitMessages.longValue() < 0) {
if (pageLimitMessages != null && pageLimitMessages < 0) {
logger.debug("address {} had pageLimitMessages<0, setting it as null", address);
pageLimitMessages = null;
}
@ -289,9 +295,21 @@ public class PagingStoreImpl implements PagingStore {
this.pageLimitBytes = null;
}
boolean pageLimitMessagesChanged = !Objects.equals(this.pageLimitMessages, originalLimitMessages);
boolean estimatedMaxPagesChanged = false;
if (pageLimitBytes != null && pageSize > 0) {
Long originalEstimatedMaxPages = this.estimatedMaxPages;
estimatedMaxPages = pageLimitBytes / pageSize;
logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
estimatedMaxPagesChanged = !Objects.equals(estimatedMaxPages, originalEstimatedMaxPages);
}
if (!firstTime && (estimatedMaxPagesChanged || pageLimitMessagesChanged)) {
if (estimatedMaxPagesChanged) {
checkNumberOfPages();
}
cursorProvider.checkClearPageLimit();
}
}

View File

@ -244,7 +244,13 @@ This is to avoid a single destination using the entire disk in case their consum
You can configure either `page-limit-bytes` or `page-limit-messages`, along with `page-full-policy` on the address settings limiting how much data will be recorded in paging.
If you configure `page-full-policy` as DROP, messages will be simplify dropped while the clients will not get any exceptions, while if you configured FAIL the producers will receive a JMS Exception for the error condition.
If you configure `page-full-policy` as DROP, messages will be simply dropped while the clients will not get any exceptions, while if you configured FAIL the producers will receive a JMS Exception for the error condition.
[NOTE]
The `page-limit-bytes` is used to identify a maximum number of page files internally (i.e. `page-limit-bytes` / `page-size-bytes`) which is then compared against the current number of page files.
If configured, `page-limit-bytes` must be equal or greater than `page-size-bytes` or it will cause immediate block.
If the limit determined from `page-limit-bytes`, once converted to a number of pages, is less than the current number of page files in the store then paging will be blocked based on `page-full-policy` until the number of current page files drops to less than or equal to the calculated file limit. It will become blocked again once the number of page files is greater than the value determined by `page-limit-bytes` (`page-limit-bytes` / `page-size-bytes`).
== Example

View File

@ -24,4 +24,8 @@ public class PageCursorProviderTestAccessor {
public static void cleanup(PageCursorProvider provider) {
((PageCursorProviderImpl)provider).cleanup();
}
public static Long getNumberOfMessagesOnSubscriptions(PageCursorProvider provider) {
return ((PageCursorProviderImpl)provider).getNumberOfMessagesOnSubscriptions();
}
}

View File

@ -1695,15 +1695,29 @@ public abstract class ActiveMQTestBase extends ArtemisTestCase {
public List<String> sendMessageBatch(int batchSize,
ClientSession session,
SimpleString queueAddr) throws ActiveMQException {
return sendMessageBatch(batchSize, 1024, session, queueAddr);
}
public ClientMessage createMessage(ClientSession session, int messageSize, int seq, List<String> collectIds) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[messageSize]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", seq); // this is to make the print-data easier to debug
if (collectIds != null) {
collectIds.add(id);
}
return message;
}
public List<String> sendMessageBatch(int batchSize,
int messageSize,
ClientSession session,
SimpleString queueAddr) throws ActiveMQException {
List<String> messageIds = new ArrayList<>();
ClientProducer producer = session.createProducer(queueAddr);
for (int i = 0; i < batchSize; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", i); // this is to make the print-data easier to debug
messageIds.add(id);
ClientMessage message = createMessage(session, messageSize, i, messageIds);
producer.send(message);
}
session.commit();

View File

@ -17,8 +17,10 @@
package org.apache.activemq.artemis.tests.integration.paging;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.HashSet;
import java.util.Set;
@ -26,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -34,10 +37,15 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.jupiter.api.BeforeEach;
@ -288,6 +296,433 @@ public class PagingSendTest extends ActiveMQTestBase {
}
}
@Test
public void testPageLimitBytesValidation() throws Exception {
final String addressName = getTestMethodName();
try (ClientSessionFactory sf = createSessionFactory(locator)) {
ClientSession session = sf.createSession(false, false);
SimpleString queueAddr = SimpleString.of(addressName);
session.createQueue(QueueConfiguration.of(queueAddr));
int size = 1048576;
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
addressSettings.setPageSizeBytes(size);
addressSettings.setPageLimitBytes((long) size);
addressSettings.setMaxSizeBytes(size);
server.getAddressSettingsRepository().addMatch(addressName, addressSettings);
int totalMessages = 15;
int messageSize = 90000;
sendMessageBatch(totalMessages, messageSize, session, queueAddr);
Queue queue = server.locateQueue(queueAddr);
// Give time Queue.deliverAsync to deliver messages
assertTrue(waitForMessages(queue, totalMessages, 10000));
PagingStore queuePagingStore = queue.getPagingStore();
assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
assertFalse(queuePagingStore.isPageFull());
// set pageLimitBytes to be smaller than pageSizeBytes
addressSettings.setPageLimitBytes((long) (size - 1));
server.getAddressSettingsRepository().addMatch(addressName, addressSettings);
// check the settings applied
assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
// check pageFull is true
assertTrue(queuePagingStore.isPageFull());
// send a messages should be immediately blocked (in our case FAIL)
try {
sendMessageBatch(1, messageSize, session, queueAddr);
fail("should be immediate blocked on paging");
} catch (ActiveMQAddressFullException ex) {
//ok
}
assertTrue(queuePagingStore.isPageFull());
// set pageLimitBytes to bigger value to unblock paging again
addressSettings.setPageLimitBytes((long) (size * 2));
server.getAddressSettingsRepository().addMatch(addressName, addressSettings);
// now page is enabled again
assertFalse(queuePagingStore.isPageFull());
sendMessageBatch(1, messageSize, session, queueAddr);
assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
}
}
@Test
public void testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst() throws Exception {
final String queueName = getTestMethodName();
try (ClientSessionFactory sf = createSessionFactory(locator)) {
ClientSession session = sf.createSession(true, true);
SimpleString queueAddr = SimpleString.of(queueName);
session.createQueue(QueueConfiguration.of(queueAddr));
final int size = 1024 * 50;
final Long maxMessages = 10L;
final int initPageSizeBytes = size;
final Long initPageLimitBytes = (long) (initPageSizeBytes * 10);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
addressSettings.setPageSizeBytes(initPageSizeBytes);
addressSettings.setPageLimitBytes(initPageLimitBytes);
addressSettings.setMaxSizeBytes(size);
addressSettings.setPageLimitMessages(maxMessages);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
int totalMessages = 0;
int messageSize = 1024;
ClientProducer producer = session.createProducer(queueAddr);
boolean stop = false;
while (!stop) {
try {
ClientMessage message = createMessage(session, messageSize, totalMessages, null);
producer.send(message);
totalMessages++;
session.commit();
} catch (ActiveMQAddressFullException ex) {
stop = true;
}
}
Queue queue = server.locateQueue(queueAddr);
assertTrue(waitForMessages(queue, totalMessages, 10000));
PagingStore queuePagingStore = queue.getPagingStore();
assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
// the messages reach the limit
PageCursorProvider cursorProvider = queuePagingStore.getCursorProvider();
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider), maxMessages);
// but pages still under limit
assertEquals(initPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
long existingPages = queuePagingStore.getNumberOfPages();
assertTrue(existingPages <= initPageLimitBytes / initPageSizeBytes);
server.stop(true);
waitForServerToStop(server);
// restart the server the pageFull is still true
try {
server.start();
waitForServerToStart(server);
queue = server.locateQueue(queueAddr);
queuePagingStore = queue.getPagingStore();
assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
// but current pages still under limit
long currentPages = queuePagingStore.getNumberOfPages();
assertEquals(initPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
long maxPages = initPageLimitBytes / initPageSizeBytes;
assertTrue(currentPages <= maxPages);
// now increase the max messages to unblock producer
final Long bigLimitMessages = 1000L;
addressSettings.setPageLimitMessages(bigLimitMessages);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
// no longer page full
assertFalse(queuePagingStore.isPageFull());
// now send more messages until pagefull
locator = createFactory(isNetty());
try (ClientSessionFactory csf = createSessionFactory(locator)) {
stop = false;
session = csf.createSession(true, true);
producer = session.createProducer(queueAddr);
while (!stop) {
try {
ClientMessage message = createMessage(session, messageSize, totalMessages, null);
producer.send(message);
totalMessages++;
assertTrue(totalMessages <= bigLimitMessages, "test is broken");
} catch (ActiveMQAddressFullException ex) {
stop = true;
}
}
}
// check the page full
assertTrue(queuePagingStore.isPageFull());
// because it reaches pageLimitBytes
currentPages = queuePagingStore.getNumberOfPages();
assertEquals(initPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
maxPages = initPageLimitBytes / initPageSizeBytes;
assertTrue(currentPages > maxPages);
// and messages still below limit messages
cursorProvider = queuePagingStore.getCursorProvider();
assertEquals(bigLimitMessages, queuePagingStore.getPageLimitMessages());
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider) < bigLimitMessages);
} finally {
server.stop(true);
}
}
}
@Test
public void testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws Exception {
final String queueName = getTestMethodName();
try (ClientSessionFactory sf = createSessionFactory(locator)) {
ClientSession session = sf.createSession(true, true);
SimpleString queueAddr = SimpleString.of(queueName);
session.createQueue(QueueConfiguration.of(queueAddr));
// the numbers should make sure page files reach the pageLimitBytes before pageLimitMessages
final int size = 1024 * 50;
final Long maxMessages = 200L;
final int initPageSizeBytes = size;
final Long initPageLimitBytes = (long) (initPageSizeBytes * 5);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
addressSettings.setPageSizeBytes(initPageSizeBytes);
addressSettings.setPageLimitBytes(initPageLimitBytes);
addressSettings.setMaxSizeBytes(size);
addressSettings.setPageLimitMessages(maxMessages);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
int totalMessages = 0;
int messageSize = 1024;
ClientProducer producer = session.createProducer(queueAddr);
boolean stop = false;
while (!stop) {
try {
ClientMessage message = createMessage(session, messageSize, totalMessages, null);
producer.send(message);
totalMessages++;
session.commit();
} catch (ActiveMQAddressFullException ex) {
stop = true;
}
}
Queue queue = server.locateQueue(queueAddr);
assertTrue(waitForMessages(queue, totalMessages, 10000));
PagingStore queuePagingStore = queue.getPagingStore();
assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
// the pages reach the limit
long existingPages = queuePagingStore.getNumberOfPages();
assertEquals(initPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
long maxPages = initPageLimitBytes / initPageSizeBytes;
assertTrue(existingPages > maxPages);
// but messages still under limit
PageCursorProvider cursorProvider = queuePagingStore.getCursorProvider();
Long existingMessages = PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
assertTrue(existingMessages < maxMessages, "existing " + existingMessages + " should be less than max " + maxMessages);
server.stop(true);
waitForServerToStop(server);
// restart the server the pageFull is still true
try {
server.start();
waitForServerToStart(server);
queue = server.locateQueue(queueAddr);
queuePagingStore = queue.getPagingStore();
assertNotNull(queuePagingStore);
assertTrue(queuePagingStore.isPageFull());
// but messages still under limit
cursorProvider = queuePagingStore.getCursorProvider();
existingMessages = PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
assertTrue(existingMessages < maxMessages, "existing " + existingMessages + " should be still less than max " + maxMessages);
// now increase the pageLimitBytes to unblock producer
final Long newPageLimitBytes = (long) (size * 20);
addressSettings.setPageLimitBytes(newPageLimitBytes);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
// no longer page full
assertFalse(queuePagingStore.isPageFull());
// now send more messages until pagefull
locator = createFactory(isNetty());
try (ClientSessionFactory csf = createSessionFactory(locator)) {
stop = false;
session = csf.createSession(true, true);
producer = session.createProducer(queueAddr);
while (!stop) {
try {
ClientMessage message = createMessage(session, messageSize, totalMessages, null);
producer.send(message);
totalMessages++;
} catch (ActiveMQAddressFullException ex) {
stop = true;
}
}
}
// check the page full
assertTrue(queuePagingStore.isPageFull());
// current pages not exceeds the max pages
Long currentPages = queuePagingStore.getNumberOfPages();
assertEquals(newPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
maxPages = newPageLimitBytes / initPageSizeBytes;
assertTrue(currentPages <= maxPages);
// however messages reaches limit messages
cursorProvider = queuePagingStore.getCursorProvider();
assertEquals(maxMessages, queuePagingStore.getPageLimitMessages());
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider), maxMessages);
} finally {
server.stop(true);
}
}
}
@Test
public void testPageLimitBytesValidationOnRestart() throws Exception {
final String queueName = getTestMethodName();
try (ClientSessionFactory sf = createSessionFactory(locator)) {
ClientSession session = sf.createSession(false, false);
SimpleString queueAddr = SimpleString.of(queueName);
session.createQueue(QueueConfiguration.of(queueAddr));
final int size = 1024 * 50;
final int initPageSizeBytes = size;
final Long initPageLimitBytes = (long) (size * 10);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
addressSettings.setPageSizeBytes(initPageSizeBytes);
addressSettings.setPageLimitBytes(initPageLimitBytes);
addressSettings.setMaxSizeBytes(size);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
int totalMessages = 30;
int messageSize = 1024 * 10;
sendMessageBatch(totalMessages, messageSize, session, queueAddr);
Queue queue = server.locateQueue(queueAddr);
assertTrue(waitForMessages(queue, totalMessages, 10000));
PagingStore queuePagingStore = queue.getPagingStore();
assertNotNull(queuePagingStore);
assertTrue(queuePagingStore.isPaging());
assertFalse(queuePagingStore.isPageFull());
long existingPages = queuePagingStore.getNumberOfPages();
assertTrue(existingPages > 4);
// restart the server with a smaller pageLimitSize < existing pages.
server.stop(true);
waitForServerToStop(server);
final Long newPageLimitBytes = (long) (size * 4);
addressSettings.setPageLimitBytes(newPageLimitBytes);
server.getAddressSettingsRepository().addMatch(queueName, addressSettings);
// server will start regardless of current page count > (pageLimitBytes / pageSizeBytes)
try {
server.start();
waitForServerToStart(server);
// verify current situation
queue = server.locateQueue(queueAddr);
assertTrue(waitForMessages(queue, totalMessages, 10000));
queuePagingStore = queue.getPagingStore();
assertNotNull(queuePagingStore);
assertTrue(queuePagingStore.isPaging() && queuePagingStore.isPageFull());
long currentPages = queuePagingStore.getNumberOfPages();
assertEquals(existingPages, currentPages);
assertEquals(newPageLimitBytes, queuePagingStore.getPageLimitBytes());
assertEquals(initPageSizeBytes, queuePagingStore.getPageSizeBytes());
long maxPages = newPageLimitBytes / initPageSizeBytes;
assertTrue(currentPages > maxPages);
//consume messages until current pages goes down to below maxPage
locator = createFactory(isNetty());
final int numMessages = 25;
try (ClientSessionFactory csf = createSessionFactory(locator)) {
session = csf.createSession(false, true);
session.start();
ClientConsumer consumer = session.createConsumer(queueName);
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
message.acknowledge();
session.commit();
}
currentPages = queuePagingStore.getNumberOfPages();
// check page store not page full
assertTrue(queuePagingStore.isPaging());
assertFalse(queuePagingStore.isPageFull());
// the current pages should be less or equal to maxPages
assertTrue(currentPages <= maxPages);
//send messages one by one until page full
ClientProducer producer = session.createProducer(queueName);
boolean isFull = false;
try {
for (int i = 0; i < numMessages; i++) {
ClientMessage message = createMessage(session, messageSize, i, null);
producer.send(message);
session.commit();
}
} catch (ActiveMQAddressFullException e) {
isFull = true;
}
assertTrue(isFull);
currentPages = queuePagingStore.getNumberOfPages();
// now current pages should be one more than maxPages
assertTrue(currentPages == maxPages + 1);
// check paging store page full
assertTrue(queuePagingStore.isPageFull());
}
} finally {
server.stop(true);
}
}
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any