This closes #4966
This commit is contained in:
commit
618738d48a
|
@ -512,6 +512,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
|
||||
public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1;
|
||||
|
||||
public static final int INITIAL_QUEUE_BUFFER_SIZE = 8192;
|
||||
|
||||
public static final int DEFAULT_MAX_DISK_USAGE;
|
||||
|
||||
static {
|
||||
|
@ -1967,4 +1969,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_MIRROR_PAGE_TRANSACTION;
|
||||
}
|
||||
|
||||
/**
|
||||
* the initial size of the intermediate message buffer used for queues
|
||||
*/
|
||||
public static int getInitialQueueBufferSize() {
|
||||
return INITIAL_QUEUE_BUFFER_SIZE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -309,6 +309,11 @@ public final class AddressSettingsInfo {
|
|||
}
|
||||
private boolean enableMetrics;
|
||||
|
||||
static {
|
||||
META_BEAN.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize);
|
||||
}
|
||||
private int initialQueueBufferSize;
|
||||
|
||||
|
||||
public static AddressSettingsInfo fromJSON(final String jsonString) {
|
||||
AddressSettingsInfo newInfo = new AddressSettingsInfo();
|
||||
|
@ -554,5 +559,9 @@ public final class AddressSettingsInfo {
|
|||
public boolean isEnableMetrics() {
|
||||
return enableMetrics;
|
||||
}
|
||||
|
||||
public int getInitialQueueBufferSize() {
|
||||
return initialQueueBufferSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -107,6 +107,14 @@ public final class Validators {
|
|||
}
|
||||
};
|
||||
|
||||
public static final Validator<Number> POSITIVE_POWER_OF_TWO = (name, value) -> {
|
||||
if ((value.longValue() & (value.longValue() - 1)) == 0 && value.longValue() > 0) {
|
||||
return value;
|
||||
} else {
|
||||
throw ActiveMQMessageBundle.BUNDLE.positivePowerOfTwo(name, value);
|
||||
}
|
||||
};
|
||||
|
||||
public static final Validator<Number> MINUS_ONE_OR_POSITIVE_INT = (name, value) -> {
|
||||
if (value.longValue() == -1 || (value.longValue() > 0 && value.longValue() <= Integer.MAX_VALUE)) {
|
||||
return value;
|
||||
|
|
|
@ -141,6 +141,7 @@ import static org.apache.activemq.artemis.core.config.impl.Validators.PAGE_FULL_
|
|||
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.PERCENTAGE_OR_MINUS_ONE;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_INT;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.POSITIVE_POWER_OF_TWO;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.ROUTING_TYPE;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_POLICY_TYPE;
|
||||
import static org.apache.activemq.artemis.core.config.impl.Validators.SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT;
|
||||
|
@ -388,6 +389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
|
||||
|
||||
private static final String INITIAL_QUEUE_BUFFER_SIZE = "initial-queue-buffer-size";
|
||||
|
||||
private boolean validateAIO = false;
|
||||
|
||||
private boolean printPageMaxSizeUsed = false;
|
||||
|
@ -1472,6 +1475,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
|
||||
} else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) {
|
||||
addressSettings.setIDCacheSize(GE_ZERO.validate(ID_CACHE_SIZE, XMLUtil.parseInt(child)).intValue());
|
||||
} else if (INITIAL_QUEUE_BUFFER_SIZE.equalsIgnoreCase(name)) {
|
||||
addressSettings.setInitialQueueBufferSize(POSITIVE_POWER_OF_TWO.validate(INITIAL_QUEUE_BUFFER_SIZE, XMLUtil.parseInt(child)).intValue());
|
||||
}
|
||||
}
|
||||
return setting;
|
||||
|
|
|
@ -559,4 +559,6 @@ public interface ActiveMQMessageBundle {
|
|||
@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
|
||||
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);
|
||||
|
||||
@Message(id = 229256, value = "{} must be a positive power of 2 (actual value: {})")
|
||||
IllegalArgumentException positivePowerOfTwo(String name, Number val);
|
||||
}
|
||||
|
|
|
@ -178,6 +178,8 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
|
||||
long getRingSize();
|
||||
|
||||
int getInitialQueueBufferSize();
|
||||
|
||||
default boolean isMirrorController() {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -213,7 +213,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
// Messages will first enter intermediateMessageReferences
|
||||
// Before they are added to messageReferences
|
||||
// This is to avoid locking the queue on the producer
|
||||
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
|
||||
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences;
|
||||
|
||||
// This is where messages are stored
|
||||
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
|
||||
|
@ -365,6 +365,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private volatile long createdTimestamp = -1;
|
||||
|
||||
private final int initialQueueBufferSize;
|
||||
|
||||
@Override
|
||||
public boolean isSwept() {
|
||||
return swept;
|
||||
|
@ -753,6 +755,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
|
||||
this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();
|
||||
|
||||
this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null
|
||||
? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
|
||||
: this.addressSettings.getInitialQueueBufferSize();
|
||||
this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize);
|
||||
}
|
||||
|
||||
// Bindable implementation -------------------------------------------------------------------------------------
|
||||
|
@ -1092,6 +1099,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
return routingType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInitialQueueBufferSize() {
|
||||
return this.initialQueueBufferSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRoutingType(RoutingType routingType) {
|
||||
if (addressInfo.getRoutingTypes().contains(routingType)) {
|
||||
|
|
|
@ -533,6 +533,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
static {
|
||||
metaBean.add(Integer.class, "queuePrefetch", (t, p) -> t.queuePrefetch = p, t -> t.queuePrefetch);
|
||||
}
|
||||
|
||||
static {
|
||||
metaBean.add(Integer.class, "initialQueueBufferSize", (t, p) -> t.initialQueueBufferSize = p, t -> t.initialQueueBufferSize);
|
||||
}
|
||||
private Integer initialQueueBufferSize = null;
|
||||
|
||||
//from amq5
|
||||
//make it transient
|
||||
@Deprecated
|
||||
|
@ -1283,6 +1289,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public Integer getInitialQueueBufferSize() {
|
||||
return initialQueueBufferSize;
|
||||
}
|
||||
|
||||
public AddressSettings setInitialQueueBufferSize(int initialQueueBufferSize) {
|
||||
this.initialQueueBufferSize = initialQueueBufferSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge two AddressSettings instances in one instance
|
||||
*
|
||||
|
@ -1783,6 +1798,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
if (!Objects.equals(idCacheSize, that.idCacheSize))
|
||||
return false;
|
||||
if (!Objects.equals(initialQueueBufferSize, that.initialQueueBufferSize)) {
|
||||
return false;
|
||||
}
|
||||
return Objects.equals(queuePrefetch, that.queuePrefetch);
|
||||
}
|
||||
|
||||
|
@ -1865,11 +1883,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = 31 * result + (enableIngressTimestamp != null ? enableIngressTimestamp.hashCode() : 0);
|
||||
result = 31 * result + (idCacheSize != null ? idCacheSize.hashCode() : 0);
|
||||
result = 31 * result + (queuePrefetch != null ? queuePrefetch.hashCode() : 0);
|
||||
result = 31 * result + (initialQueueBufferSize != null ? initialQueueBufferSize.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + '}';
|
||||
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4465,6 +4465,15 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="initial-queue-buffer-size" default="8192" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
This will set the initial intermediate message reference buffer size for all queues on the matching address. Will use the
|
||||
default initial size if not configured.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||
|
|
|
@ -534,6 +534,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
|||
assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics());
|
||||
assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp());
|
||||
assertNull(conf.getAddressSettings().get("a1").getIDCacheSize());
|
||||
assertNull(conf.getAddressSettings().get("a1").getInitialQueueBufferSize());
|
||||
|
||||
assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString());
|
||||
assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||
|
@ -575,6 +576,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
|||
assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics());
|
||||
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
|
||||
assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize());
|
||||
assertEquals(Integer.valueOf(128), conf.getAddressSettings().get("a2").getInitialQueueBufferSize());
|
||||
|
||||
assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
|
||||
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
|
||||
|
|
|
@ -152,4 +152,16 @@ public class ValidatorsTest {
|
|||
ValidatorsTest.success(Validators.NULL_OR_TWO_CHARACTERS, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPOSITIVE_POWER_OF_TWO() {
|
||||
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 0);
|
||||
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, -10);
|
||||
ValidatorsTest.failure(Validators.POSITIVE_POWER_OF_TWO, 127);
|
||||
|
||||
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 2);
|
||||
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 64);
|
||||
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 1024);
|
||||
ValidatorsTest.success(Validators.POSITIVE_POWER_OF_TWO, 16777216);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -378,6 +378,11 @@ public class RoutingContextTest {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInitialQueueBufferSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounter getConsumersRefCount() {
|
||||
return null;
|
||||
|
|
|
@ -1717,5 +1717,10 @@ public class ScheduledDeliveryHandlerTest {
|
|||
public void setExclusive(boolean exclusive) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInitialQueueBufferSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
addressSettingsToMerge.setMinExpiryDelay(888L);
|
||||
addressSettingsToMerge.setMaxExpiryDelay(777L);
|
||||
addressSettingsToMerge.setIDCacheSize(5);
|
||||
addressSettingsToMerge.setInitialQueueBufferSize(256);
|
||||
|
||||
if (copy) {
|
||||
addressSettings = addressSettings.mergeCopy(addressSettingsToMerge);
|
||||
|
@ -113,6 +114,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay());
|
||||
assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay());
|
||||
assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize());
|
||||
assertEquals(Integer.valueOf(256), addressSettings.getInitialQueueBufferSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -639,6 +639,7 @@
|
|||
<management-browse-page-size>400</management-browse-page-size>
|
||||
<management-message-attribute-size-limit>265</management-message-attribute-size-limit>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
|
|
|
@ -84,5 +84,6 @@
|
|||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
|
@ -84,5 +84,6 @@
|
|||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
<initial-queue-buffer-size>128</initial-queue-buffer-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
|
@ -80,6 +80,7 @@ Here an example of an `address-setting` entry that might be found in the `broker
|
|||
<enable-metrics>true</enable-metrics>
|
||||
<enable-ingress-timestamp>false</enable-ingress-timestamp>
|
||||
<id-cache-size>20000</id-cache-size>
|
||||
<initial-queue-buffer-size>8192</initial-queue-buffer-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
----
|
||||
|
@ -394,6 +395,11 @@ that helps to detect and prevent the processing of duplicate messages based on t
|
|||
By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` elements if not explicitly configured.
|
||||
Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes].
|
||||
|
||||
initial-queue-buffer-size::
|
||||
defines the initial number of elements allocated initially on the JVM heap for the message reference buffer. This is allocated for each queue.
|
||||
If there are many queues that are created but unlikely to be used, this can be configured to a smaller value to prevent large initial allocation.
|
||||
By default, this value is `8192` if not explicitly configured. This must be a positive power of 2 (i.e. `0` is not an option).
|
||||
|
||||
## Literal Matches
|
||||
|
||||
A _literal_ match is a match that contains wildcards but should be applied _without regard_ to those wildcards. In other words, the wildcards should be ignored and the address settings should only be applied to the literal (i.e. exact) match.
|
||||
|
|
|
@ -798,6 +798,11 @@ see `auto-create-queues` & `auto-create-addresses`
|
|||
| Number of messages a management resource can browse
|
||||
| 200
|
||||
|
||||
|
||||
| xref:address-settings.adoc#address-settings[initial-queue-buffer-size]
|
||||
| The number of elements in the intermediate message buffer allocated for each queue
|
||||
| 8192
|
||||
|
||||
| xref:address-model.adoc#non-durable-subscription-queue[default-purge-on-no-consumers]
|
||||
| `purge-on-no-consumers` value if none is set on the queue
|
||||
| `false`
|
||||
|
|
|
@ -1029,4 +1029,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInitialQueueBufferSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,6 +174,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
|||
assertEquals(10L, queue.getDelayBeforeDispatch());
|
||||
assertEquals("newUser", queue.getUser().toString());
|
||||
assertEquals(180L, queue.getRingSize());
|
||||
assertEquals(8192, queue.getInitialQueueBufferSize());
|
||||
|
||||
factory = new ActiveMQConnectionFactory("vm://0");
|
||||
|
||||
|
|
|
@ -1247,6 +1247,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress());
|
||||
assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress());
|
||||
assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay());
|
||||
|
||||
addressSettings.setInitialQueueBufferSize(64);
|
||||
returnedSettings = serverControl.addAddressSettings("foo", addressSettings.toJSON());
|
||||
info = AddressSettings.fromJSON(returnedSettings);
|
||||
assertEquals(addressSettings.getDeadLetterAddress(), info.getDeadLetterAddress());
|
||||
assertEquals(addressSettings.getExpiryAddress(), info.getExpiryAddress());
|
||||
assertEquals(addressSettings.getRedeliveryDelay(), info.getRedeliveryDelay());
|
||||
assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize());
|
||||
}
|
||||
@TestTemplate
|
||||
public void emptyAddressSettings() throws Exception {
|
||||
|
@ -1308,6 +1316,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(addressSettings.getExpiryQueuePrefix(), info.getExpiryQueuePrefix());
|
||||
assertEquals(addressSettings.getExpiryQueueSuffix(), info.getExpiryQueueSuffix());
|
||||
assertEquals(addressSettings.isEnableMetrics(), info.isEnableMetrics());
|
||||
assertEquals(addressSettings.getInitialQueueBufferSize(), info.getInitialQueueBufferSize());
|
||||
}
|
||||
@TestTemplate
|
||||
public void testAddressSettings() throws Exception {
|
||||
|
@ -1368,6 +1377,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
String expiryQueuePrefix = RandomUtil.randomString();
|
||||
String expiryQueueSuffix = RandomUtil.randomString();
|
||||
boolean enableMetrics = RandomUtil.randomBoolean();
|
||||
int initialQueueBufferSize = (int) Math.pow(2, 14);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setDeadLetterAddress(SimpleString.of(DLA))
|
||||
|
@ -1422,7 +1432,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
.setExpiryQueueSuffix(SimpleString.of(expiryQueueSuffix))
|
||||
.setMinExpiryDelay(minExpiryDelay)
|
||||
.setMaxExpiryDelay(maxExpiryDelay)
|
||||
.setEnableMetrics(enableMetrics);
|
||||
.setEnableMetrics(enableMetrics)
|
||||
.setInitialQueueBufferSize(initialQueueBufferSize);
|
||||
|
||||
|
||||
serverControl.addAddressSettings(addressMatch, addressSettings.toJSON());
|
||||
|
@ -1496,6 +1507,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
||||
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
||||
assertEquals(enableMetrics, info.isEnableMetrics());
|
||||
assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize());
|
||||
|
||||
|
||||
addressSettings.setMaxSizeBytes(-1).setPageSizeBytes(1000);
|
||||
|
@ -1557,6 +1569,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(expiryQueuePrefix, info.getExpiryQueuePrefix());
|
||||
assertEquals(expiryQueueSuffix, info.getExpiryQueueSuffix());
|
||||
assertEquals(enableMetrics, info.isEnableMetrics());
|
||||
assertEquals(initialQueueBufferSize, info.getInitialQueueBufferSize());
|
||||
|
||||
|
||||
addressSettings.setMaxSizeBytes(-2).setPageSizeBytes(1000);
|
||||
|
|
Loading…
Reference in New Issue