ARTEMIS-4447 Add paging prefetch parameters into address settings
we are adding new attributes to determine how many messages (or bytes) we are reading from paging into Queue memory.
This commit is contained in:
parent
d01445f485
commit
7c9a15e9b4
|
@ -227,8 +227,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String MAX_READ_PAGE_BYTES_NODE_NAME = "max-read-page-bytes";
|
||||
|
||||
private static final String PREFETCH_PAGE_BYTES_NODE_NAME = "prefetch-page-bytes";
|
||||
|
||||
private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages";
|
||||
|
||||
private static final String PREFETCH_PAGE_MESSAGES_NODE_NAME = "prefetch-page-messages";
|
||||
|
||||
private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
|
||||
|
||||
private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";
|
||||
|
@ -1294,14 +1298,22 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
long pageSizeLong = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
|
||||
Validators.POSITIVE_INT.validate(PAGE_SIZE_BYTES_NODE_NAME, pageSizeLong);
|
||||
addressSettings.setPageSizeBytes((int) pageSizeLong);
|
||||
} else if (MAX_READ_PAGE_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
} else if (MAX_READ_PAGE_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
long maxReadPageMessages = Long.parseLong(getTrimmedTextContent(child));
|
||||
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(MAX_READ_PAGE_MESSAGES_NODE_NAME, maxReadPageMessages);
|
||||
addressSettings.setMaxReadPageMessages((int)maxReadPageMessages);
|
||||
} else if (MAX_READ_PAGE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
} else if (MAX_READ_PAGE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
long maxReadPageBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
|
||||
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(MAX_READ_PAGE_BYTES_NODE_NAME, maxReadPageBytes);
|
||||
addressSettings.setMaxReadPageBytes((int)maxReadPageBytes);
|
||||
} else if (PREFETCH_PAGE_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
long prefetchPageMessages = Long.parseLong(getTrimmedTextContent(child));
|
||||
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PREFETCH_PAGE_MESSAGES_NODE_NAME, prefetchPageMessages);
|
||||
addressSettings.setPrefetchPageMessages((int)prefetchPageMessages);
|
||||
} else if (PREFETCH_PAGE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
long prefetchPageBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
|
||||
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PREFETCH_PAGE_BYTES_NODE_NAME, prefetchPageBytes);
|
||||
addressSettings.setPrefetchPageBytes((int)prefetchPageBytes);
|
||||
} else if (PAGE_MAX_CACHE_SIZE_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
if (!printPageMaxSizeUsed) {
|
||||
printPageMaxSizeUsed = true;
|
||||
|
|
|
@ -92,6 +92,10 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
|||
|
||||
int getMaxPageReadMessages();
|
||||
|
||||
int getPrefetchPageBytes();
|
||||
|
||||
int getPrefetchPageMessages();
|
||||
|
||||
void applySetting(AddressSettings addressSettings);
|
||||
|
||||
/** This method will look if the current state of paging is not paging,
|
||||
|
|
|
@ -100,6 +100,10 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
private int maxPageReadMessages = -1;
|
||||
|
||||
private int prefetchPageBytes = -1;
|
||||
|
||||
private int prefetchPageMessages = -1;
|
||||
|
||||
private long maxMessages;
|
||||
|
||||
private volatile boolean pageFull;
|
||||
|
@ -227,8 +231,12 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
maxPageReadMessages = addressSettings.getMaxReadPageMessages();
|
||||
|
||||
prefetchPageMessages = addressSettings.getPrefetchPageMessages();
|
||||
|
||||
maxPageReadBytes = addressSettings.getMaxReadPageBytes();
|
||||
|
||||
prefetchPageBytes = addressSettings.getPrefetchPageBytes();
|
||||
|
||||
maxMessages = addressSettings.getMaxSizeMessages();
|
||||
|
||||
configureSizeMetric();
|
||||
|
@ -408,11 +416,21 @@ public class PagingStoreImpl implements PagingStore {
|
|||
return maxPageReadBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPrefetchPageBytes() {
|
||||
return prefetchPageBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxPageReadMessages() {
|
||||
return maxPageReadMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPrefetchPageMessages() {
|
||||
return prefetchPageMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressFullMessagePolicy getAddressFullMessagePolicy() {
|
||||
return addressFullMessagePolicy;
|
||||
|
|
|
@ -3308,16 +3308,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
private boolean needsDepage() {
|
||||
final int maxReadMessages = pageSubscription.getPagingStore().getMaxPageReadMessages();
|
||||
final int maxReadBytes = pageSubscription.getPagingStore().getMaxPageReadBytes();
|
||||
final int prefetchMessages = pageSubscription.getPagingStore().getPrefetchPageMessages();
|
||||
final int prefetchBytes = pageSubscription.getPagingStore().getPrefetchPageBytes();
|
||||
|
||||
if (maxReadMessages <= 0 && maxReadBytes <= 0) {
|
||||
// if both maxValues are disabled, we will protect the broker using an older semantic
|
||||
if (maxReadMessages <= 0 && maxReadBytes <= 0 && prefetchBytes <= 0 && prefetchBytes <= 0) {
|
||||
// if all values are disabled, we will protect the broker using an older semantic
|
||||
// where we don't look for deliveringMetrics..
|
||||
// this would give users a chance to switch to older protection mode.
|
||||
return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxSize() &&
|
||||
intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
|
||||
} else {
|
||||
boolean needsDepageResult = (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
|
||||
(maxReadMessages <= 0 || (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < maxReadMessages);
|
||||
boolean needsDepageResult =
|
||||
(maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
|
||||
(prefetchBytes <= 0 || (queueMemorySize.getSize() < prefetchBytes)) &&
|
||||
(maxReadMessages <= 0 || (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < maxReadMessages) &&
|
||||
(prefetchMessages <= 0 || (queueMemorySize.getElements() < prefetchMessages));
|
||||
|
||||
if (!needsDepageResult) {
|
||||
if (!pageFlowControlled && (maxReadBytes > 0 && deliveringMetrics.getPersistentSize() >= maxReadBytes || maxReadMessages > 0 && deliveringMetrics.getMessageCount() >= maxReadMessages)) {
|
||||
|
|
|
@ -151,6 +151,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Integer maxReadPageMessages = null;
|
||||
|
||||
private Integer prefetchPageBytes = null;
|
||||
|
||||
private Integer prefetchPageMessages = null;
|
||||
|
||||
private Long pageLimitBytes = null;
|
||||
|
||||
private Long pageLimitMessages = null;
|
||||
|
@ -684,6 +688,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
public int getPrefetchPageMessages() {
|
||||
return prefetchPageMessages != null ? prefetchPageMessages : getMaxReadPageMessages();
|
||||
}
|
||||
|
||||
public AddressSettings setPrefetchPageMessages(final int prefetchPageMessages) {
|
||||
this.prefetchPageMessages = prefetchPageMessages <= 0 ? null : prefetchPageMessages;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Long getPageLimitBytes() {
|
||||
return pageLimitBytes;
|
||||
}
|
||||
|
@ -720,6 +734,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getPrefetchPageBytes() {
|
||||
return prefetchPageBytes != null ? prefetchPageBytes : getMaxReadPageBytes();
|
||||
}
|
||||
|
||||
public AddressSettings setPrefetchPageBytes(final int prefetchPageBytes) {
|
||||
this.prefetchPageBytes = prefetchPageBytes <= 0 ? null : prefetchPageBytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxDeliveryAttempts() {
|
||||
return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
|
||||
}
|
||||
|
@ -1317,6 +1340,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (idCacheSize == null) {
|
||||
idCacheSize = merged.idCacheSize;
|
||||
}
|
||||
if (prefetchPageMessages == null) {
|
||||
prefetchPageMessages = merged.prefetchPageMessages;
|
||||
}
|
||||
if (prefetchPageBytes == null) {
|
||||
prefetchPageBytes = merged.prefetchPageBytes;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1596,6 +1625,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (buffer.readableBytes() > 0) {
|
||||
idCacheSize = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
prefetchPageBytes = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
prefetchPageMessages = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1672,7 +1709,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.sizeOfNullableLong(pageLimitBytes) +
|
||||
BufferHelper.sizeOfNullableLong(pageLimitMessages) +
|
||||
BufferHelper.sizeOfNullableInteger(idCacheSize) +
|
||||
BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null);
|
||||
BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null) +
|
||||
BufferHelper.sizeOfNullableInteger(prefetchPageBytes) +
|
||||
BufferHelper.sizeOfNullableInteger(prefetchPageMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1824,6 +1863,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.writeNullableBoolean(buffer, autoDeleteAddressesSkipUsageCheck);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, idCacheSize);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, prefetchPageBytes);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, prefetchPageMessages);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -1906,6 +1949,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode());
|
||||
result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode());
|
||||
result = prime * result + ((idCacheSize == null) ? 0 : idCacheSize.hashCode());
|
||||
result = prime * result + ((prefetchPageBytes == null) ? 0 : prefetchPageBytes.hashCode());
|
||||
result = prime * result + ((prefetchPageMessages == null) ? 0 : prefetchPageMessages.hashCode());
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -2321,6 +2366,22 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
}
|
||||
|
||||
if (prefetchPageMessages == null) {
|
||||
if (other.prefetchPageMessages != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!prefetchPageMessages.equals(other.prefetchPageMessages)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (prefetchPageBytes == null) {
|
||||
if (other.prefetchPageBytes != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!prefetchPageBytes.equals(other.prefetchPageBytes)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2470,6 +2531,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
pageFullMessagePolicy +
|
||||
", idCacheSize=" +
|
||||
idCacheSize +
|
||||
", prefetchPageMessages=" +
|
||||
prefetchPageMessages +
|
||||
", prefetchPageBytes=" +
|
||||
prefetchPageBytes +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3923,7 +3923,7 @@
|
|||
<xsd:element name="page-max-cache-size" default="5" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Number of paging files to cache in memory to avoid IO during paging navigation
|
||||
Number of paging files to cache in memory to avoid IO during paging navigation. This is not used any more. and it will be ignored.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
@ -3931,8 +3931,14 @@
|
|||
<xsd:element name="max-read-page-messages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
How many messages are we allowed to read from page into the Queue each time. We should read more data from pages as messages are acknowledged until it fills up the size.
|
||||
Between this and max-read-page-bytes the system will stop reading based on whatever hits the mark first.
|
||||
Maximum number of paged messages that the broker can read into memory per-queue. The default value is -1, which means that no limit applies. </xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="prefetch-page-messages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
How many messages we are reading from paging and keeping ready in queues ready to deliver to consumers. Default: If not defined max-read-page-messages is used.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
@ -3940,9 +3946,16 @@
|
|||
<xsd:element name="max-read-page-bytes" type="xsd:string" default="20M" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
How many bytes are we allowed to read from page into the Queue each time. We should read more data from pages as messages are acknowledged until it fills up the size.
|
||||
Between this and max-read-page-messages the system will stop reading based on whatever hits the mark first.
|
||||
</xsd:documentation>
|
||||
Maximum memory, in bytes, that can be used to read paged messages into memory per-queue.
|
||||
When applying this limit, the broker takes into account both messages that are currently delivering and messages that are ready to be delivered to consumers.
|
||||
The default value is 2 * page-size (usually being 20 MB). If consumers are slow to acknowledge messages, you can increase the default value to ensure that the memory is not consumed by messages pending acknowledgment, which can starve the broker of messages. </xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="prefetch-page-bytes" type="xsd:string" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Number of paged messages that the broker can read from disk into memory per-queue. The default value is taken from max-read-page-messages, usually at -1, which means that no limit applies. </xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
|
|
|
@ -1623,6 +1623,59 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPagePrefetch() throws Throwable {
|
||||
ConfigurationImpl configuration = new ConfigurationImpl();
|
||||
|
||||
Properties properties = new Properties();
|
||||
|
||||
String randomString = RandomUtil.randomString();
|
||||
|
||||
properties.put("addressSettings.#.expiryAddress", randomString);
|
||||
properties.put("addressSettings.#.prefetchPageMessages", "333");
|
||||
properties.put("addressSettings.#.prefetchPageBytes", "777");
|
||||
|
||||
configuration.parsePrefixedProperties(properties, null);
|
||||
|
||||
Assert.assertEquals(1, configuration.getAddressSettings().size());
|
||||
Assert.assertEquals(SimpleString.toSimpleString(randomString), configuration.getAddressSettings().get("#").getExpiryAddress());
|
||||
Assert.assertEquals(333, configuration.getAddressSettings().get("#").getPrefetchPageMessages());
|
||||
Assert.assertEquals(777, configuration.getAddressSettings().get("#").getPrefetchPageBytes());
|
||||
|
||||
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
|
||||
|
||||
Assert.assertEquals(333, storeImpl.getPrefetchPageMessages());
|
||||
Assert.assertEquals(777, storeImpl.getPrefetchPageBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPagePrefetchDefault() throws Throwable {
|
||||
ConfigurationImpl configuration = new ConfigurationImpl();
|
||||
|
||||
Properties properties = new Properties();
|
||||
|
||||
String randomString = RandomUtil.randomString();
|
||||
|
||||
properties.put("addressSettings.#.maxReadPageMessages", "333");
|
||||
properties.put("addressSettings.#.maxReadPageBytes", "777");
|
||||
|
||||
configuration.parsePrefixedProperties(properties, null);
|
||||
|
||||
Assert.assertEquals(1, configuration.getAddressSettings().size());
|
||||
Assert.assertEquals(333, configuration.getAddressSettings().get("#").getPrefetchPageMessages());
|
||||
Assert.assertEquals(777, configuration.getAddressSettings().get("#").getPrefetchPageBytes());
|
||||
Assert.assertEquals(333, configuration.getAddressSettings().get("#").getMaxReadPageMessages());
|
||||
Assert.assertEquals(777, configuration.getAddressSettings().get("#").getMaxReadPageBytes());
|
||||
|
||||
PagingStore storeImpl = new PagingStoreImpl(new SimpleString("Test"), (ScheduledExecutorService) null, 100L, Mockito.mock(PagingManager.class), Mockito.mock(StorageManager.class), Mockito.mock(SequentialFileFactory.class), Mockito.mock(PagingStoreFactory.class), new SimpleString("Test"), configuration.getAddressSettings().get("#"), null, null, true);
|
||||
|
||||
Assert.assertEquals(333, storeImpl.getPrefetchPageMessages());
|
||||
Assert.assertEquals(777, storeImpl.getPrefetchPageBytes());
|
||||
Assert.assertEquals(333, storeImpl.getMaxPageReadMessages());
|
||||
Assert.assertEquals(777, storeImpl.getMaxPageReadBytes());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDivertViaProperties() throws Exception {
|
||||
ConfigurationImpl configuration = new ConfigurationImpl();
|
||||
|
|
|
@ -414,7 +414,7 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testParsePageLimitSettings() throws Exception {
|
||||
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-read-page-bytes>1k</max-read-page-bytes><page-limit-bytes>10G</page-limit-bytes><page-limit-messages>3221225472</page-limit-messages><page-full-policy>FAIL</page-full-policy><max-read-page-messages>33</max-read-page-messages>.\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
|
||||
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-read-page-bytes>1k</max-read-page-bytes><prefetch-page-bytes>100M</prefetch-page-bytes><prefetch-page-messages>777</prefetch-page-messages><page-limit-bytes>10G</page-limit-bytes><page-limit-messages>3221225472</page-limit-messages><page-full-policy>FAIL</page-full-policy><max-read-page-messages>33</max-read-page-messages>.\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
|
||||
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
|
||||
|
@ -424,6 +424,8 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(1024, settings.getMaxReadPageBytes());
|
||||
Assert.assertEquals(33, settings.getMaxReadPageMessages());
|
||||
Assert.assertEquals(10L * 1024 * 1024 * 1024, settings.getPageLimitBytes().longValue());
|
||||
Assert.assertEquals(100 * 1024 * 1024, settings.getPrefetchPageBytes());
|
||||
Assert.assertEquals(777, settings.getPrefetchPageMessages());
|
||||
Assert.assertEquals(3L * 1024 * 1024 * 1024, settings.getPageLimitMessages().longValue());
|
||||
Assert.assertEquals("FAIL", settings.getPageFullMessagePolicy().toString());
|
||||
}
|
||||
|
|
|
@ -99,15 +99,21 @@ If the value is `BLOCK` then client message producers will block when they try a
|
|||
| `PAGE`
|
||||
|
||||
| `max-read-page-messages`
|
||||
| how many message can be read from paging into the Queue whenever more messages are needed.
|
||||
The system wtill stop reading if `max-read-page-bytes hits the limit first.
|
||||
| -1
|
||||
| Maximum number of paged messages that the broker can read into memory per-queue. The default value is -1, which means that no limit applies.
|
||||
| -1 (disabled)
|
||||
|
||||
| `max-read-page-bytes`
|
||||
| how much memory the messages read from paging can take on the Queue whenever more messages are needed.
|
||||
The system will stop reading if `max-read-page-messages` hits the limit first.
|
||||
|Maximum memory, in bytes, that can be used to read paged messages into memory per-queue. When applying this limit, the broker takes into account both messages that are currently delivering and messages that are ready to be delivered to consumers. The default value is 2 * page-size (usually being 20 MB). If consumers are slow to acknowledge messages, you can increase the default value to ensure that the memory is not consumed by messages pending acknowledgment, which can starve the broker of messages.
|
||||
| 2 * page-size-bytes
|
||||
|
||||
|prefetch-page-messages
|
||||
|Number of paged messages that the broker can read from disk into memory per-queue. The default value is taken from max-read-page-messages, usually at -1, which means that no limit applies.
|
||||
|`max-read-page-messages`
|
||||
|
||||
|prefetch-page-bytes
|
||||
|Number of paged messages that the broker can read from disk into memory per-queue. The default value is taken from max-read-page-messages, usually at -1, which means that no limit applies.
|
||||
|if not defined, `max-read-page-bytes`
|
||||
|
||||
| `page-limit-bytes`
|
||||
| After entering page mode, how much data would the system allow incoming.
|
||||
Notice this will be internally converted as number of pages.
|
||||
|
@ -135,10 +141,10 @@ The configured policy will start based on the first value to reach its mark.
|
|||
|
||||
==== Maximum read from page
|
||||
|
||||
`max-read-page-messages` and `max-read-page-bytes` are used to control messaging reading from paged file into the Queue.
|
||||
The broker will add messages on the Queue until either `max-read-page-meessages` or `max-read-page-bytes` reaches the limit.
|
||||
`max-read-page-messages`, `max-read-page-bytes`, `prefetch-page-messages` and `prefetch-page-bytes` are used to control reading from paged file into the Queue.
|
||||
The broker will add messages as long as all these limits are satisfied.
|
||||
|
||||
If both values are set to -1 the broker will keep reading messages as long as the consumer is reaching for more messages.
|
||||
If all these values are set to -1 the broker will keep reading messages as long as the consumer is reaching for more messages.
|
||||
However this would keep the broker unprotected from consumers allocating huge transactions or consumers that don't have flow control enabled.
|
||||
|
||||
== Global Max Size
|
||||
|
|
|
@ -421,6 +421,16 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPrefetchPageBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPrefetchPageMessages() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean page(Message message,
|
||||
Transaction tx,
|
||||
|
|
|
@ -216,21 +216,12 @@ under the License.
|
|||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
|
||||
|
||||
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
|
||||
based on the first attribute to hits the maximum value -->
|
||||
<!-- limit for the address in bytes, -1 means unlimited -->
|
||||
<max-size-bytes>0</max-size-bytes>
|
||||
<!-- limit for the address in messages, -1 means unlimited -->
|
||||
<max-size-messages>0</max-size-messages>
|
||||
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
|
||||
Lower this setting if you have too many queues in memory. -->
|
||||
<page-size-bytes>5M</page-size-bytes>
|
||||
<max-read-page-bytes>100M</max-read-page-bytes>
|
||||
<max-read-page-messages>-1</max-read-page-messages>
|
||||
<!-- how many bytes equivalent of messages are kept in memory from paging (based on memory estimate). The system will stop reading whenever this or max-read-page-messages hits the max first. -->
|
||||
<max-read-page-bytes>1M</max-read-page-bytes>
|
||||
|
||||
<prefetch-page-bytes>1M</prefetch-page-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
|
|
|
@ -30,11 +30,11 @@ export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
|
|||
export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP
|
||||
|
||||
export TEST_HORIZONTAL_CORE_DESTINATIONS=20
|
||||
export TEST_HORIZONTAL_CORE_MESSAGES=1000
|
||||
export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100
|
||||
export TEST_HORIZONTAL_CORE_MESSAGES=5000
|
||||
export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=1000
|
||||
export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0
|
||||
export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000
|
||||
export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10
|
||||
export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=20
|
||||
|
||||
export TEST_HORIZONTAL_AMQP_DESTINATIONS=20
|
||||
export TEST_HORIZONTAL_AMQP_MESSAGES=1000
|
||||
|
|
Loading…
Reference in New Issue