ARTEMIS-4809 Allow configuring initial queue buffer size
In some setups, there could be a few hundred thousand queues that are created due to many consumers that are connecting. However, most of these are empty and stay empty for the entire day since there aren't necessarily messages to be sent. The 8K intermediateMessageReferences instantiates an 64KB buffer (Object[]). This means we have large allocation and live heap that ultimately remains empty for almost the entire day. In this commit, we introduce initial-queue-buffer-size, which defaults to the current value of 8192. It can be set programmatically via QueueConfiguration#setInitialQueueBufferSize(int). Note that this must be a positive power of 2.
This commit is contained in:
parent
7cf6b86bc5
commit
8b3874d613
|
@ -512,6 +512,8 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
|
|
||||||
public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1;
|
public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1;
|
||||||
|
|
||||||
|
public static final int INITIAL_QUEUE_BUFFER_SIZE = 8192;
|
||||||
|
|
||||||
public static final int DEFAULT_MAX_DISK_USAGE;
|
public static final int DEFAULT_MAX_DISK_USAGE;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -1967,4 +1969,10 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
return DEFAULT_MIRROR_PAGE_TRANSACTION;
|
return DEFAULT_MIRROR_PAGE_TRANSACTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the initial size of the intermediate message buffer used for queues
|
||||||
|
*/
|
||||||
|
public static int getInitialQueueBufferSize() {
|
||||||
|
return INITIAL_QUEUE_BUFFER_SIZE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -309,6 +309,11 @@ public final class AddressSettingsInfo {
|
||||||
}
|
}
|
||||||
private boolean enableMetrics;
|
private boolean enableMetrics;
|
||||||
|
|
||||||
|
static {
|
||||||
|
META_BEAN.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize);
|
||||||
|
}
|
||||||
|
private int initialQueueBufferSize;
|
||||||
|
|
||||||
|
|
||||||
public static AddressSettingsInfo fromJSON(final String jsonString) {
|
public static AddressSettingsInfo fromJSON(final String jsonString) {
|
||||||
AddressSettingsInfo newInfo = new AddressSettingsInfo();
|
AddressSettingsInfo newInfo = new AddressSettingsInfo();
|
||||||
|
@ -554,5 +559,9 @@ public final class AddressSettingsInfo {
|
||||||
public boolean isEnableMetrics() {
|
public boolean isEnableMetrics() {
|
||||||
return enableMetrics;
|
return enableMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getInitialQueueBufferSize() {
|
||||||
|
return initialQueueBufferSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,6 +107,14 @@ public final class Validators {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public static final Validator<Number> POSITIVE_POWER_OF_TWO = (name, value) -> {
|
||||||
|
if ((value.longValue() & (value.longValue() - 1)) == 0 && value.longValue() > 0) {
|
||||||
|
return value;
|
||||||
|
} else {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.positivePowerOfTwo(name, value);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public static final Validator<Number> MINUS_ONE_OR_POSITIVE_INT = (name, value) -> {
|
public static final Validator<Number> MINUS_ONE_OR_POSITIVE_INT = (name, value) -> {
|
||||||
if (value.longValue() == -1 || (value.longValue() > 0 && value.longValue() <= Integer.MAX_VALUE)) {
|
if (value.longValue() == -1 || (value.longValue() > 0 && value.longValue() <= Integer.MAX_VALUE)) {
|
||||||
return value;
|
return value;
|
||||||
|
|
|
@ -141,6 +141,7 @@ import static org.apache.activemq.artemis.core.config.impl.Validators.PAGE_FULL_
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE;
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE_OR_MINUS_ONE;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE_OR_MINUS_ONE;
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_INT;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_INT;
|
||||||
|
import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_POWER_OF_TWO;
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.ROUTING_TYPE;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.ROUTING_TYPE;
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_POLICY_TYPE;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_POLICY_TYPE;
|
||||||
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT;
|
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT;
|
||||||
|
@ -388,6 +389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
|
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
|
||||||
|
|
||||||
|
private static final String INITIAL_QUEUE_BUFFER_SIZE = "initial-queue-buffer-size";
|
||||||
|
|
||||||
private boolean validateAIO = false;
|
private boolean validateAIO = false;
|
||||||
|
|
||||||
private boolean printPageMaxSizeUsed = false;
|
private boolean printPageMaxSizeUsed = false;
|
||||||
|
@ -1472,6 +1475,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
|
addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
|
||||||
} else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) {
|
} else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) {
|
||||||
addressSettings.setIDCacheSize(GE_ZERO.validate(ID_CACHE_SIZE, XMLUtil.parseInt(child)).intValue());
|
addressSettings.setIDCacheSize(GE_ZERO.validate(ID_CACHE_SIZE, XMLUtil.parseInt(child)).intValue());
|
||||||
|
} else if (INITIAL_QUEUE_BUFFER_SIZE.equalsIgnoreCase(name)) {
|
||||||
|
addressSettings.setInitialQueueBufferSize(POSITIVE_POWER_OF_TWO.validate(INITIAL_QUEUE_BUFFER_SIZE, XMLUtil.parseInt(child)).intValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return setting;
|
return setting;
|
||||||
|
|
|
@ -559,4 +559,6 @@ public interface ActiveMQMessageBundle {
|
||||||
@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
|
@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
|
||||||
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);
|
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);
|
||||||
|
|
||||||
|
@Message(id = 229256, value = "{} must be a positive power of 2 (actual value: {})")
|
||||||
|
IllegalArgumentException positivePowerOfTwo(String name, Number val);
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,6 +178,8 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
long getRingSize();
|
long getRingSize();
|
||||||
|
|
||||||
|
int getInitialQueueBufferSize();
|
||||||
|
|
||||||
default boolean isMirrorController() {
|
default boolean isMirrorController() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -213,7 +213,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
// Messages will first enter intermediateMessageReferences
|
// Messages will first enter intermediateMessageReferences
|
||||||
// Before they are added to messageReferences
|
// Before they are added to messageReferences
|
||||||
// This is to avoid locking the queue on the producer
|
// This is to avoid locking the queue on the producer
|
||||||
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
|
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences;
|
||||||
|
|
||||||
// This is where messages are stored
|
// This is where messages are stored
|
||||||
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
|
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
|
||||||
|
@ -365,6 +365,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
private volatile long createdTimestamp = -1;
|
private volatile long createdTimestamp = -1;
|
||||||
|
|
||||||
|
private final int initialQueueBufferSize;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isSwept() {
|
public boolean isSwept() {
|
||||||
return swept;
|
return swept;
|
||||||
|
@ -753,6 +755,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();
|
this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();
|
||||||
|
|
||||||
|
this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null
|
||||||
|
? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
|
||||||
|
: this.addressSettings.getInitialQueueBufferSize();
|
||||||
|
this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bindable implementation -------------------------------------------------------------------------------------
|
// Bindable implementation -------------------------------------------------------------------------------------
|
||||||
|
@ -1092,6 +1099,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return routingType;
|
return routingType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInitialQueueBufferSize() {
|
||||||
|
return this.initialQueueBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRoutingType(RoutingType routingType) {
|
public void setRoutingType(RoutingType routingType) {
|
||||||
if (addressInfo.getRoutingTypes().contains(routingType)) {
|
if (addressInfo.getRoutingTypes().contains(routingType)) {
|
||||||
|
|
|
@ -533,6 +533,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
static {
|
static {
|
||||||
metaBean.add(Integer.class, "queuePrefetch", (t, p) -> t.queuePrefetch = p, t -> t.queuePrefetch);
|
metaBean.add(Integer.class, "queuePrefetch", (t, p) -> t.queuePrefetch = p, t -> t.queuePrefetch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
metaBean.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize);
|
||||||
|
}
|
||||||
|
private Integer initialQueueBufferSize = null;
|
||||||
|
|
||||||
//from amq5
|
//from amq5
|
||||||
//make it transient
|
//make it transient
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -1283,6 +1289,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getInitialQueueBufferSize() {
|
||||||
|
return initialQueueBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddressSettings setInitialQueueBufferSize(int initialQueueBufferSize) {
|
||||||
|
this.initialQueueBufferSize = initialQueueBufferSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge two AddressSettings instances in one instance
|
* Merge two AddressSettings instances in one instance
|
||||||
*
|
*
|
||||||
|
@ -1783,6 +1798,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return false;
|
return false;
|
||||||
if (!Objects.equals(idCacheSize, that.idCacheSize))
|
if (!Objects.equals(idCacheSize, that.idCacheSize))
|
||||||
return false;
|
return false;
|
||||||
|
if (!Objects.equals(initialQueueBufferSize, that.initialQueueBufferSize)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return Objects.equals(queuePrefetch, that.queuePrefetch);
|
return Objects.equals(queuePrefetch, that.queuePrefetch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1865,11 +1883,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
result = 31 * result + (enableIngressTimestamp != null ? enableIngressTimestamp.hashCode() : 0);
|
result = 31 * result + (enableIngressTimestamp != null ? enableIngressTimestamp.hashCode() : 0);
|
||||||
result = 31 * result + (idCacheSize != null ? idCacheSize.hashCode() : 0);
|
result = 31 * result + (idCacheSize != null ? idCacheSize.hashCode() : 0);
|
||||||
result = 31 * result + (queuePrefetch != null ? queuePrefetch.hashCode() : 0);
|
result = 31 * result + (queuePrefetch != null ? queuePrefetch.hashCode() : 0);
|
||||||
|
result = 31 * result + (initialQueueBufferSize != null ? initialQueueBufferSize.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + '}';
|
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
|
||||||
|
+ '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4465,6 +4465,15 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="initial-queue-buffer-size" default="8192" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
This will set the initial intermediate message reference buffer size for all queues on the matching address. Will use the
|
||||||
|
default initial size if not configured.
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
</xsd:all>
|
</xsd:all>
|
||||||
|
|
||||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||||
|
|
|
@ -534,6 +534,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
||||||
assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics());
|
assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics());
|
||||||
assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp());
|
assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp());
|
||||||
assertNull(conf.getAddressSettings().get("a1").getIDCacheSize());
|
assertNull(conf.getAddressSettings().get("a1").getIDCacheSize());
|
||||||
|
assertNull(conf.getAddressSettings().get("a1").getInitialQueueBufferSize());
|
||||||
|
|
||||||
assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString());
|
assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString());
|
||||||
assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
|
assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||||
|
@ -575,6 +576,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
||||||
assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics());
|
assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics());
|
||||||
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
|
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
|
||||||
assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize());
|
assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize());
|
||||||
|
assertEquals(Integer.valueOf(128), conf.getAddressSettings().get("a2").getInitialQueueBufferSize());
|
||||||
|
|
||||||
assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
|
assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
|
||||||
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
|
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
|
||||||
|
|
|
@ -152,4 +152,16 @@ public class ValidatorsTest {
|
||||||
ValidatorsTest.success(Validators.NULL_OR_TWO_CHARACTERS, null);
|
ValidatorsTest.success(Validators.NULL_OR_TWO_CHARACTERS, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPOSITIVE_POWER_OF_TWO() {
|
||||||
|
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 0);
|
||||||
|
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, -10);
|
||||||
|
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 127);
|
||||||
|
|
||||||
|
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 2);
|
||||||
|
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 64);
|
||||||
|
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 1024);
|
||||||
|
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 16777216);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -378,6 +378,11 @@ public class RoutingContextTest {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInitialQueueBufferSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReferenceCounter getConsumersRefCount() {
|
public ReferenceCounter getConsumersRefCount() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1717,5 +1717,10 @@ public class ScheduledDeliveryHandlerTest {
|
||||||
public void setExclusive(boolean exclusive) {
|
public void setExclusive(boolean exclusive) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInitialQueueBufferSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,6 +92,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
||||||
addressSettingsToMerge.setMinExpiryDelay(888L);
|
addressSettingsToMerge.setMinExpiryDelay(888L);
|
||||||
addressSettingsToMerge.setMaxExpiryDelay(777L);
|
addressSettingsToMerge.setMaxExpiryDelay(777L);
|
||||||
addressSettingsToMerge.setIDCacheSize(5);
|
addressSettingsToMerge.setIDCacheSize(5);
|
||||||
|
addressSettingsToMerge.setInitialQueueBufferSize(256);
|
||||||
|
|
||||||
if (copy) {
|
if (copy) {
|
||||||
addressSettings = addressSettings.mergeCopy(addressSettingsToMerge);
|
addressSettings = addressSettings.mergeCopy(addressSettingsToMerge);
|
||||||
|
@ -113,6 +114,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
||||||
assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay());
|
assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay());
|
||||||
assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay());
|
assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay());
|
||||||
assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize());
|
assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize());
|
||||||
|
assertEquals(Integer.valueOf(256), addressSettings.getInitialQueueBufferSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -639,6 +639,7 @@
|
||||||
<management-browse-page-size>400</management-browse-page-size>
|
<management-browse-page-size>400</management-browse-page-size>
|
||||||
<management-message-attribute-size-limit>265</management-message-attribute-size-limit>
|
<management-message-attribute-size-limit>265</management-message-attribute-size-limit>
|
||||||
<id-cache-size>500</id-cache-size>
|
<id-cache-size>500</id-cache-size>
|
||||||
|
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
<resource-limit-settings>
|
<resource-limit-settings>
|
||||||
|
|
|
@ -84,5 +84,6 @@
|
||||||
<retroactive-message-count>10</retroactive-message-count>
|
<retroactive-message-count>10</retroactive-message-count>
|
||||||
<enable-metrics>false</enable-metrics>
|
<enable-metrics>false</enable-metrics>
|
||||||
<id-cache-size>500</id-cache-size>
|
<id-cache-size>500</id-cache-size>
|
||||||
|
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
|
@ -84,5 +84,6 @@
|
||||||
<retroactive-message-count>10</retroactive-message-count>
|
<retroactive-message-count>10</retroactive-message-count>
|
||||||
<enable-metrics>false</enable-metrics>
|
<enable-metrics>false</enable-metrics>
|
||||||
<id-cache-size>500</id-cache-size>
|
<id-cache-size>500</id-cache-size>
|
||||||
|
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
|
@ -80,6 +80,7 @@ Here an example of an `address-setting` entry that might be found in the `broker
|
||||||
<enable-metrics>true</enable-metrics>
|
<enable-metrics>true</enable-metrics>
|
||||||
<enable-ingress-timestamp>false</enable-ingress-timestamp>
|
<enable-ingress-timestamp>false</enable-ingress-timestamp>
|
||||||
<id-cache-size>20000</id-cache-size>
|
<id-cache-size>20000</id-cache-size>
|
||||||
|
<initial-queue-buffer-size>8192</initial-queue-buffer-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
----
|
----
|
||||||
|
@ -394,6 +395,11 @@ that helps to detect and prevent the processing of duplicate messages based on t
|
||||||
By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` elements if not explicitly configured.
|
By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` elements if not explicitly configured.
|
||||||
Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes].
|
Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes].
|
||||||
|
|
||||||
|
initial-queue-buffer-size::
|
||||||
|
defines the initial number of elements allocated initially on the JVM heap for the message reference buffer. This is allocated for each queue.
|
||||||
|
If there are many queues that are created but unlikely to be used, this can be configured to a smaller value to prevent large initial allocation.
|
||||||
|
By default, this value is `8192` if not explicitly configured. This must be a positive power of 2 (i.e. `0` is not an option).
|
||||||
|
|
||||||
## Literal Matches
|
## Literal Matches
|
||||||
|
|
||||||
A _literal_ match is a match that contains wildcards but should be applied _without regard_ to those wildcards. In other words, the wildcards should be ignored and the address settings should only be applied to the literal (i.e. exact) match.
|
A _literal_ match is a match that contains wildcards but should be applied _without regard_ to those wildcards. In other words, the wildcards should be ignored and the address settings should only be applied to the literal (i.e. exact) match.
|
||||||
|
|
|
@ -798,6 +798,11 @@ see `auto-create-queues` & `auto-create-addresses`
|
||||||
| Number of messages a management resource can browse
|
| Number of messages a management resource can browse
|
||||||
| 200
|
| 200
|
||||||
|
|
||||||
|
|
||||||
|
| xref:address-settings.adoc#address-settings[initial-queue-buffer-size]
|
||||||
|
| The number of elements in the intermediate message buffer allocated for each queue
|
||||||
|
| 8192
|
||||||
|
|
||||||
| xref:address-model.adoc#non-durable-subscription-queue[default-purge-on-no-consumers]
|
| xref:address-model.adoc#non-durable-subscription-queue[default-purge-on-no-consumers]
|
||||||
| `purge-on-no-consumers` value if none is set on the queue
|
| `purge-on-no-consumers` value if none is set on the queue
|
||||||
| `false`
|
| `false`
|
||||||
|
|
|
@ -1029,4 +1029,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInitialQueueBufferSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,6 +174,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
||||||
assertEquals(10L, queue.getDelayBeforeDispatch());
|
assertEquals(10L, queue.getDelayBeforeDispatch());
|
||||||
assertEquals("newUser", queue.getUser().toString());
|
assertEquals("newUser", queue.getUser().toString());
|
||||||
assertEquals(180L, queue.getRingSize());
|
assertEquals(180L, queue.getRingSize());
|
||||||
|
assertEquals(8192, queue.getInitialQueueBufferSize());
|
||||||
|
|
||||||
factory = new ActiveMQConnectionFactory("vm://0");
|
factory = new ActiveMQConnectionFactory("vm://0");
|
||||||
|
|
||||||
|
|
|
@ -1247,6 +1247,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress());
|
assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress());
|
||||||
assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress());
|
assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress());
|
||||||
assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay());
|
assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay());
|
||||||
|
|
||||||
|
addressSettings.setInitialQueueBufferSize(64);
|
||||||
|
returnedSettings = serverControl.addAddressSettings("foo", addressSettings.toJSON());
|
||||||
|
info = AddressSettings.fromJSON(returnedSettings);
|
||||||
|
assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress());
|
||||||
|
assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress());
|
||||||
|
assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay());
|
||||||
|
assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize());
|
||||||
}
|
}
|
||||||
@TestTemplate
|
@TestTemplate
|
||||||
public void emptyAddressSettings() throws Exception {
|
public void emptyAddressSettings() throws Exception {
|
||||||
|
@ -1308,6 +1316,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(addressSettings.getExpiryQueuePrefix(), info.getExpiryQueuePrefix());
|
assertEquals(addressSettings.getExpiryQueuePrefix(), info.getExpiryQueuePrefix());
|
||||||
assertEquals(addressSettings.getExpiryQueueSuffix(), info.getExpiryQueueSuffix());
|
assertEquals(addressSettings.getExpiryQueueSuffix(), info.getExpiryQueueSuffix());
|
||||||
assertEquals(addressSettings.isEnableMetrics(), info.isEnableMetrics());
|
assertEquals(addressSettings.isEnableMetrics(), info.isEnableMetrics());
|
||||||
|
assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize());
|
||||||
}
|
}
|
||||||
@TestTemplate
|
@TestTemplate
|
||||||
public void testAddressSettings() throws Exception {
|
public void testAddressSettings() throws Exception {
|
||||||
|
@ -1368,6 +1377,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
String expiryQueuePrefix = RandomUtil.randomString();
|
String expiryQueuePrefix = RandomUtil.randomString();
|
||||||
String expiryQueueSuffix = RandomUtil.randomString();
|
String expiryQueueSuffix = RandomUtil.randomString();
|
||||||
boolean enableMetrics = RandomUtil.randomBoolean();
|
boolean enableMetrics = RandomUtil.randomBoolean();
|
||||||
|
int initialQueueBufferSize = (int) Math.pow(2, 14);
|
||||||
|
|
||||||
AddressSettings addressSettings = new AddressSettings();
|
AddressSettings addressSettings = new AddressSettings();
|
||||||
addressSettings.setDeadLetterAddress(SimpleString.of(DLA))
|
addressSettings.setDeadLetterAddress(SimpleString.of(DLA))
|
||||||
|
@ -1422,7 +1432,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
.setExpiryQueueSuffix(SimpleString.of(expiryQueueSuffix))
|
.setExpiryQueueSuffix(SimpleString.of(expiryQueueSuffix))
|
||||||
.setMinExpiryDelay(minExpiryDelay)
|
.setMinExpiryDelay(minExpiryDelay)
|
||||||
.setMaxExpiryDelay(maxExpiryDelay)
|
.setMaxExpiryDelay(maxExpiryDelay)
|
||||||
.setEnableMetrics(enableMetrics);
|
.setEnableMetrics(enableMetrics)
|
||||||
|
.setInitialQueueBufferSize(initialQueueBufferSize);
|
||||||
|
|
||||||
|
|
||||||
serverControl.addAddressSettings(addressMatch, addressSettings.toJSON());
|
serverControl.addAddressSettings(addressMatch, addressSettings.toJSON());
|
||||||
|
@ -1496,6 +1507,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
||||||
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
||||||
assertEquals(enableMetrics, info.isEnableMetrics());
|
assertEquals(enableMetrics, info.isEnableMetrics());
|
||||||
|
assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize());
|
||||||
|
|
||||||
|
|
||||||
addressSettings.setMaxSizeBytes(-1).setPageSizeBytes(1000);
|
addressSettings.setMaxSizeBytes(-1).setPageSizeBytes(1000);
|
||||||
|
@ -1557,6 +1569,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
||||||
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
||||||
assertEquals(enableMetrics, info.isEnableMetrics());
|
assertEquals(enableMetrics, info.isEnableMetrics());
|
||||||
|
assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize());
|
||||||
|
|
||||||
|
|
||||||
addressSettings.setMaxSizeBytes(-2).setPageSizeBytes(1000);
|
addressSettings.setMaxSizeBytes(-2).setPageSizeBytes(1000);
|
||||||
|
|
Loading…
Reference in New Issue