diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 2813154014..b8500f153e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -512,6 +512,8 @@ public final class ActiveMQDefaultConfiguration { public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1; + public static final int INITIAL_QUEUE_BUFFER_SIZE = 8192; + public static final int DEFAULT_MAX_DISK_USAGE; static { @@ -1967,4 +1969,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_MIRROR_PAGE_TRANSACTION; } + /** + * the initial size of the intermediate message buffer used for queues + */ + public static int getInitialQueueBufferSize() { + return INITIAL_QUEUE_BUFFER_SIZE; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java index 9640abe214..718fdb8551 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java @@ -309,6 +309,11 @@ public final class AddressSettingsInfo { } private boolean enableMetrics; + static { + META_BEAN.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize); + } + private int initialQueueBufferSize; + public static AddressSettingsInfo fromJSON(final String jsonString) { AddressSettingsInfo newInfo = new AddressSettingsInfo(); @@ -554,5 +559,9 @@ public final class AddressSettingsInfo { public boolean isEnableMetrics() { return enableMetrics; } + + public int getInitialQueueBufferSize() { + return initialQueueBufferSize; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 4a75a75e36..44c80f192f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -107,6 +107,14 @@ public final class Validators { } }; + public static final Validator POSITIVE_POWER_OF_TWO = (name, value) -> { + if ((value.longValue() & (value.longValue() - 1)) == 0 && value.longValue() > 0) { + return value; + } else { + throw ActiveMQMessageBundle.BUNDLE.positivePowerOfTwo(name, value); + } + }; + public static final Validator MINUS_ONE_OR_POSITIVE_INT = (name, value) -> { if (value.longValue() == -1 || (value.longValue() > 0 && value.longValue() <= Integer.MAX_VALUE)) { return value; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index e12dc13fce..0d8d2080e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -141,6 +141,7 @@ import static org.apache.activemq.artemis.core.config.impl.Validators.PAGE_FULL_ import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE; import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE_OR_MINUS_ONE; import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_INT; +import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_POWER_OF_TWO; import static org.apache.activemq.artemis.core.config.impl.Validators.ROUTING_TYPE; import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_POLICY_TYPE; import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT; @@ -388,6 +389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction"; + private static final String INITIAL_QUEUE_BUFFER_SIZE = "initial-queue-buffer-size"; + private boolean validateAIO = false; private boolean printPageMaxSizeUsed = false; @@ -1472,6 +1475,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child)); } else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) { addressSettings.setIDCacheSize(GE_ZERO.validate(ID_CACHE_SIZE, XMLUtil.parseInt(child)).intValue()); + } else if (INITIAL_QUEUE_BUFFER_SIZE.equalsIgnoreCase(name)) { + addressSettings.setInitialQueueBufferSize(POSITIVE_POWER_OF_TWO.validate(INITIAL_QUEUE_BUFFER_SIZE, XMLUtil.parseInt(child)).intValue()); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 0064bac1d1..1cbdcd33a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -559,4 +559,6 @@ public interface ActiveMQMessageBundle { @Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}") ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState); + @Message(id = 229256, value = "{} must be a positive power of 2 (actual value: {})") + IllegalArgumentException positivePowerOfTwo(String name, Number val); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index dac6b00aea..4a6b8a280c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -178,6 +178,8 @@ public interface Queue extends Bindable,CriticalComponent { long getRingSize(); + int getInitialQueueBufferSize(); + default boolean isMirrorController() { return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 720ebd1f06..021d691191 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -213,7 +213,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Messages will first enter intermediateMessageReferences // Before they are added to messageReferences // This is to avoid locking the queue on the producer - private final MpscUnboundedArrayQueue intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192); + private final MpscUnboundedArrayQueue intermediateMessageReferences; // This is where messages are stored protected final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator()); @@ -365,6 +365,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile long createdTimestamp = -1; + private final int initialQueueBufferSize; + @Override public boolean isSwept() { return swept; @@ -753,6 +755,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize(); + + this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null + ? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE + : this.addressSettings.getInitialQueueBufferSize(); + this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize); } // Bindable implementation ------------------------------------------------------------------------------------- @@ -1092,6 +1099,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return routingType; } + @Override + public int getInitialQueueBufferSize() { + return this.initialQueueBufferSize; + } + @Override public void setRoutingType(RoutingType routingType) { if (addressInfo.getRoutingTypes().contains(routingType)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index ac74b99aa8..6056a239c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -533,6 +533,12 @@ public class AddressSettings implements Mergeable, Serializable static { metaBean.add(Integer.class, "queuePrefetch", (t, p) -> t.queuePrefetch = p, t -> t.queuePrefetch); } + + static { + metaBean.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize); + } + private Integer initialQueueBufferSize = null; + //from amq5 //make it transient @Deprecated @@ -1283,6 +1289,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public Integer getInitialQueueBufferSize() { + return initialQueueBufferSize; + } + + public AddressSettings setInitialQueueBufferSize(int initialQueueBufferSize) { + this.initialQueueBufferSize = initialQueueBufferSize; + return this; + } + /** * Merge two AddressSettings instances in one instance * @@ -1783,6 +1798,9 @@ public class AddressSettings implements Mergeable, Serializable return false; if (!Objects.equals(idCacheSize, that.idCacheSize)) return false; + if (!Objects.equals(initialQueueBufferSize, that.initialQueueBufferSize)) { + return false; + } return Objects.equals(queuePrefetch, that.queuePrefetch); } @@ -1865,11 +1883,13 @@ public class AddressSettings implements Mergeable, Serializable result = 31 * result + (enableIngressTimestamp != null ? enableIngressTimestamp.hashCode() : 0); result = 31 * result + (idCacheSize != null ? idCacheSize.hashCode() : 0); result = 31 * result + (queuePrefetch != null ? queuePrefetch.hashCode() : 0); + result = 31 * result + (initialQueueBufferSize != null ? initialQueueBufferSize.hashCode() : 0); return result; } @Override public String toString() { - return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + '}'; + return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize + + '}'; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 9827ba7fac..2eeb1e0d61 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -4465,6 +4465,15 @@ + + + + This will set the initial intermediate message reference buffer size for all queues on the matching address. Will use the + default initial size if not configured. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 39df6e03de..331b0fa187 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -534,6 +534,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase { assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics()); assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp()); assertNull(conf.getAddressSettings().get("a1").getIDCacheSize()); + assertNull(conf.getAddressSettings().get("a1").getInitialQueueBufferSize()); assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString()); assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources()); @@ -575,6 +576,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase { assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics()); assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp()); assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize()); + assertEquals(Integer.valueOf(128), conf.getAddressSettings().get("a2").getInitialQueueBufferSize()); assertEquals(111, conf.getMirrorAckManagerQueueAttempts()); assertEquals(222, conf.getMirrorAckManagerPageAttempts()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ValidatorsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ValidatorsTest.java index cc39e4ae56..853766bda8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ValidatorsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ValidatorsTest.java @@ -152,4 +152,16 @@ public class ValidatorsTest { ValidatorsTest.success(Validators.NULL_OR_TWO_CHARACTERS, null); } + @Test + public void testPOSITIVE_POWER_OF_TWO() { + ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 0); + ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, -10); + ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 127); + + ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 2); + ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 64); + ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 1024); + ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 16777216); + } + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index 31ee0db827..6f145b46b3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -378,6 +378,11 @@ public class RoutingContextTest { return 0; } + @Override + public int getInitialQueueBufferSize() { + return 0; + } + @Override public ReferenceCounter getConsumersRefCount() { return null; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index b16380efee..95de62e1a2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1717,5 +1717,10 @@ public class ScheduledDeliveryHandlerTest { public void setExclusive(boolean exclusive) { } + + @Override + public int getInitialQueueBufferSize() { + return 0; + } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index f3af43b96b..02e08e3078 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -92,6 +92,7 @@ public class AddressSettingsTest extends ServerTestBase { addressSettingsToMerge.setMinExpiryDelay(888L); addressSettingsToMerge.setMaxExpiryDelay(777L); addressSettingsToMerge.setIDCacheSize(5); + addressSettingsToMerge.setInitialQueueBufferSize(256); if (copy) { addressSettings = addressSettings.mergeCopy(addressSettingsToMerge); @@ -113,6 +114,7 @@ public class AddressSettingsTest extends ServerTestBase { assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay()); assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay()); assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize()); + assertEquals(Integer.valueOf(256), addressSettings.getInitialQueueBufferSize()); } @Test diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 7afb7cc824..00d56e3292 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -639,6 +639,7 @@ 400 265 500 + 128 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 091a8f4f17..4750e7a899 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -84,5 +84,6 @@ 10 false 500 + 128 \ No newline at end of file diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml index 091a8f4f17..4750e7a899 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml @@ -84,5 +84,6 @@ 10 false 500 + 128 \ No newline at end of file diff --git a/docs/user-manual/address-settings.adoc b/docs/user-manual/address-settings.adoc index a558776977..305f051208 100644 --- a/docs/user-manual/address-settings.adoc +++ b/docs/user-manual/address-settings.adoc @@ -80,6 +80,7 @@ Here an example of an `address-setting` entry that might be found in the `broker true false 20000 + 8192 ---- @@ -394,6 +395,11 @@ that helps to detect and prevent the processing of duplicate messages based on t By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` elements if not explicitly configured. Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes]. +initial-queue-buffer-size:: +defines the initial number of elements allocated initially on the JVM heap for the message reference buffer. This is allocated for each queue. +If there are many queues that are created but unlikely to be used, this can be configured to a smaller value to prevent large initial allocation. +By default, this value is `8192` if not explicitly configured. This must be a positive power of 2 (i.e. `0` is not an option). + ## Literal Matches A _literal_ match is a match that contains wildcards but should be applied _without regard_ to those wildcards. In other words, the wildcards should be ignored and the address settings should only be applied to the literal (i.e. exact) match. diff --git a/docs/user-manual/configuration-index.adoc b/docs/user-manual/configuration-index.adoc index c4810c91d1..c3083bde44 100644 --- a/docs/user-manual/configuration-index.adoc +++ b/docs/user-manual/configuration-index.adoc @@ -798,6 +798,11 @@ see `auto-create-queues` & `auto-create-addresses` | Number of messages a management resource can browse | 200 + +| xref:address-settings.adoc#address-settings[initial-queue-buffer-size] +| The number of elements in the intermediate message buffer allocated for each queue +| 8192 + | xref:address-model.adoc#non-durable-subscription-queue[default-purge-on-no-consumers] | `purge-on-no-consumers` value if none is set on the queue | `false` diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java index 580d9e8ed9..7d1ffd5e29 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java @@ -1029,4 +1029,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return 0; } + @Override + public int getInitialQueueBufferSize() { + return 0; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java index 2be8b4bc56..ed714bee14 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java @@ -174,6 +174,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { assertEquals(10L, queue.getDelayBeforeDispatch()); assertEquals("newUser", queue.getUser().toString()); assertEquals(180L, queue.getRingSize()); + assertEquals(8192, queue.getInitialQueueBufferSize()); factory = new ActiveMQConnectionFactory("vm://0"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 62b6e67a19..aaa50c7837 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -1247,6 +1247,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress()); assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress()); assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay()); + + addressSettings.setInitialQueueBufferSize(64); + returnedSettings = serverControl.addAddressSettings("foo", addressSettings.toJSON()); + info = AddressSettings.fromJSON(returnedSettings); + assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress()); + assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress()); + assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay()); + assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize()); } @TestTemplate public void emptyAddressSettings() throws Exception { @@ -1308,6 +1316,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(addressSettings.getExpiryQueuePrefix(), info.getExpiryQueuePrefix()); assertEquals(addressSettings.getExpiryQueueSuffix(), info.getExpiryQueueSuffix()); assertEquals(addressSettings.isEnableMetrics(), info.isEnableMetrics()); + assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize()); } @TestTemplate public void testAddressSettings() throws Exception { @@ -1368,6 +1377,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { String expiryQueuePrefix = RandomUtil.randomString(); String expiryQueueSuffix = RandomUtil.randomString(); boolean enableMetrics = RandomUtil.randomBoolean(); + int initialQueueBufferSize = (int) Math.pow(2, 14); AddressSettings addressSettings = new AddressSettings(); addressSettings.setDeadLetterAddress(SimpleString.of(DLA)) @@ -1422,7 +1432,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { .setExpiryQueueSuffix(SimpleString.of(expiryQueueSuffix)) .setMinExpiryDelay(minExpiryDelay) .setMaxExpiryDelay(maxExpiryDelay) - .setEnableMetrics(enableMetrics); + .setEnableMetrics(enableMetrics) + .setInitialQueueBufferSize(initialQueueBufferSize); serverControl.addAddressSettings(addressMatch, addressSettings.toJSON()); @@ -1496,6 +1507,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix()); assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix()); assertEquals(enableMetrics, info.isEnableMetrics()); + assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize()); addressSettings.setMaxSizeBytes(-1).setPageSizeBytes(1000); @@ -1557,6 +1569,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix()); assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix()); assertEquals(enableMetrics, info.isEnableMetrics()); + assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize()); addressSettings.setMaxSizeBytes(-2).setPageSizeBytes(1000);