ARTEMIS-5142 support never expiring incoming messages
This commit is contained in:
parent
e22f0ecba2
commit
a7a70d6595
|
@ -304,6 +304,11 @@ public final class AddressSettingsInfo {
|
|||
}
|
||||
private long maxExpiryDelay;
|
||||
|
||||
static {
|
||||
META_BEAN.add(Boolean.class, "noExpiry", (o, p) -> o.noExpiry = p, o -> o.noExpiry);
|
||||
}
|
||||
private boolean noExpiry;
|
||||
|
||||
static {
|
||||
META_BEAN.add(Boolean.class, "enableMetrics", (o, p) -> o.enableMetrics = p, o -> o.enableMetrics);
|
||||
}
|
||||
|
@ -556,6 +561,10 @@ public final class AddressSettingsInfo {
|
|||
return maxExpiryDelay;
|
||||
}
|
||||
|
||||
public boolean isNoExpiry() {
|
||||
return noExpiry;
|
||||
}
|
||||
|
||||
public boolean isEnableMetrics() {
|
||||
return enableMetrics;
|
||||
}
|
||||
|
|
|
@ -233,6 +233,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String MAX_EXPIRY_DELAY_NODE_NAME = "max-expiry-delay";
|
||||
|
||||
private static final String NO_EXPIRY_NODE_NAME = "no-expiry";
|
||||
|
||||
private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay";
|
||||
|
||||
private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
|
||||
|
@ -1331,6 +1333,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setMinExpiryDelay(XMLUtil.parseLong(child));
|
||||
} else if (MAX_EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
addressSettings.setMaxExpiryDelay(XMLUtil.parseLong(child));
|
||||
} else if (NO_EXPIRY_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
addressSettings.setNoExpiry(XMLUtil.parseBoolean(child));
|
||||
} else if (REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
|
||||
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
|
|
|
@ -1355,28 +1355,44 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return status;
|
||||
}
|
||||
|
||||
// HORNETQ-1029
|
||||
private static void applyExpiryDelay(Message message, AddressSettings settings) {
|
||||
protected static void applyExpiryDelay(Message message, AddressSettings settings) {
|
||||
long expirationOverride = settings.getExpiryDelay();
|
||||
|
||||
if (settings.isNoExpiry()) {
|
||||
if (message.getExpiration() != 0) {
|
||||
message.setExpiration(0);
|
||||
message.reencode();
|
||||
}
|
||||
} else if (expirationOverride >= 0) {
|
||||
// A -1 <expiry-delay> means don't do anything
|
||||
if (expirationOverride >= 0) {
|
||||
// only override the expiration on messages where the expiration hasn't been set by the user
|
||||
if (message.getExpiration() == 0) {
|
||||
message.setExpiration(System.currentTimeMillis() + expirationOverride);
|
||||
// only override the expiration on messages where the expiration hasn't been set by the user
|
||||
setExpiration(message, expirationOverride);
|
||||
}
|
||||
} else {
|
||||
long minExpiration = settings.getMinExpiryDelay();
|
||||
long maxExpiration = settings.getMaxExpiryDelay();
|
||||
|
||||
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
|
||||
message.setExpiration(System.currentTimeMillis() + maxExpiration);
|
||||
if (message.getExpiration() == 0) {
|
||||
// if the incoming message has NO expiration then apply the max if set and if not set then apply the min if set
|
||||
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY) {
|
||||
setExpiration(message, maxExpiration);
|
||||
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY) {
|
||||
setExpiration(message, minExpiration);
|
||||
}
|
||||
} else if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && message.getExpiration() > (System.currentTimeMillis() + maxExpiration)) {
|
||||
setExpiration(message, maxExpiration);
|
||||
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
|
||||
message.setExpiration(System.currentTimeMillis() + minExpiration);
|
||||
setExpiration(message, minExpiration);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void setExpiration(Message m, long expiration) {
|
||||
m.setExpiration(System.currentTimeMillis() + expiration);
|
||||
m.reencode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
|
||||
|
||||
|
|
|
@ -117,6 +117,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final long DEFAULT_MAX_EXPIRY_DELAY = -1;
|
||||
|
||||
public static final boolean DEFAULT_NO_EXPIRY = false;
|
||||
|
||||
public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;
|
||||
|
||||
public static final long DEFAULT_SLOW_CONSUMER_THRESHOLD = -1;
|
||||
|
@ -266,6 +268,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
}
|
||||
private Long maxExpiryDelay = null;
|
||||
|
||||
static {
|
||||
metaBean.add(Boolean.class, "noExpiry", (t, p) -> t.noExpiry = p, t -> t.noExpiry);
|
||||
}
|
||||
private Boolean noExpiry = null;
|
||||
|
||||
static {
|
||||
metaBean.add(Boolean.class, "defaultLastValueQueue", (t, p) -> t.defaultLastValueQueue = p, t -> t.defaultLastValueQueue);
|
||||
}
|
||||
|
@ -1050,6 +1057,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public Boolean isNoExpiry() {
|
||||
return noExpiry != null ? noExpiry : AddressSettings.DEFAULT_NO_EXPIRY;
|
||||
}
|
||||
|
||||
public AddressSettings setNoExpiry(final Boolean noExpiry) {
|
||||
this.noExpiry = noExpiry;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isSendToDLAOnNoRoute() {
|
||||
return sendToDLAOnNoRoute != null ? sendToDLAOnNoRoute : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
|
||||
}
|
||||
|
@ -1694,6 +1710,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
if (!Objects.equals(maxExpiryDelay, that.maxExpiryDelay))
|
||||
return false;
|
||||
if (!Objects.equals(noExpiry, that.noExpiry))
|
||||
return false;
|
||||
if (!Objects.equals(defaultLastValueQueue, that.defaultLastValueQueue))
|
||||
return false;
|
||||
if (!Objects.equals(defaultLastValueKey, that.defaultLastValueKey))
|
||||
|
@ -1830,6 +1848,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = 31 * result + (expiryDelay != null ? expiryDelay.hashCode() : 0);
|
||||
result = 31 * result + (minExpiryDelay != null ? minExpiryDelay.hashCode() : 0);
|
||||
result = 31 * result + (maxExpiryDelay != null ? maxExpiryDelay.hashCode() : 0);
|
||||
result = 31 * result + (noExpiry != null ? noExpiry.hashCode() : 0);
|
||||
result = 31 * result + (defaultLastValueQueue != null ? defaultLastValueQueue.hashCode() : 0);
|
||||
result = 31 * result + (defaultLastValueKey != null ? defaultLastValueKey.hashCode() : 0);
|
||||
result = 31 * result + (defaultNonDestructive != null ? defaultNonDestructive.hashCode() : 0);
|
||||
|
@ -1889,7 +1908,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
|
||||
return "AddressSettings{" + "addressFullMessagePolicy=" + addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + maxSizeMessages + ", pageSizeBytes=" + pageSizeBytes + ", pageMaxCache=" + pageCacheMaxSize + ", dropMessagesWhenFull=" + dropMessagesWhenFull + ", maxDeliveryAttempts=" + maxDeliveryAttempts + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", redeliveryDelay=" + redeliveryDelay + ", redeliveryMultiplier=" + redeliveryMultiplier + ", redeliveryCollisionAvoidanceFactor=" + redeliveryCollisionAvoidanceFactor + ", maxRedeliveryDelay=" + maxRedeliveryDelay + ", deadLetterAddress=" + deadLetterAddress + ", expiryAddress=" + expiryAddress + ", expiryDelay=" + expiryDelay + ", minExpiryDelay=" + minExpiryDelay + ", maxExpiryDelay=" + maxExpiryDelay + ", noExpiry=" + noExpiry + ", defaultLastValueQueue=" + defaultLastValueQueue + ", defaultLastValueKey=" + defaultLastValueKey + ", defaultNonDestructive=" + defaultNonDestructive + ", defaultExclusiveQueue=" + defaultExclusiveQueue + ", defaultGroupRebalance=" + defaultGroupRebalance + ", defaultGroupRebalancePauseDispatch=" + defaultGroupRebalancePauseDispatch + ", defaultGroupBuckets=" + defaultGroupBuckets + ", defaultGroupFirstKey=" + defaultGroupFirstKey + ", redistributionDelay=" + redistributionDelay + ", sendToDLAOnNoRoute=" + sendToDLAOnNoRoute + ", slowConsumerThreshold=" + slowConsumerThreshold + ", slowConsumerThresholdMeasurementUnit=" + slowConsumerThresholdMeasurementUnit + ", slowConsumerCheckPeriod=" + slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + ", autoCreateJmsQueues=" + autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + ", autoCreateJmsTopics=" + autoCreateJmsTopics + ", autoDeleteJmsTopics=" + autoDeleteJmsTopics + ", autoCreateQueues=" + autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + ", autoDeleteCreatedQueues=" + autoDeleteCreatedQueues + ", autoDeleteQueuesDelay=" + autoDeleteQueuesDelay + ", autoDeleteQueuesSkipUsageCheck=" + autoDeleteQueuesSkipUsageCheck + ", autoDeleteQueuesMessageCount=" + autoDeleteQueuesMessageCount + ", defaultRingSize=" + defaultRingSize + ", retroactiveMessageCount=" + retroactiveMessageCount + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + ", autoDeleteAddressesDelay=" + autoDeleteAddressesDelay + ", autoDeleteAddressesSkipUsageCheck=" + autoDeleteAddressesSkipUsageCheck + ", configDeleteAddresses=" + configDeleteAddresses + ", configDeleteDiverts=" + configDeleteDiverts + ", managementBrowsePageSize=" + managementBrowsePageSize + ", maxSizeBytesRejectThreshold=" + maxSizeBytesRejectThreshold + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers + ", defaultConsumersBeforeDispatch=" + defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + ", defaultQueueRoutingType=" + defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + ", defaultConsumerWindowSize=" + defaultConsumerWindowSize + ", autoCreateDeadLetterResources=" + autoCreateDeadLetterResources + ", deadLetterQueuePrefix=" + deadLetterQueuePrefix + ", deadLetterQueueSuffix=" + deadLetterQueueSuffix + ", autoCreateExpiryResources=" + autoCreateExpiryResources + ", expiryQueuePrefix=" + expiryQueuePrefix + ", expiryQueueSuffix=" + expiryQueueSuffix + ", enableMetrics=" + enableMetrics + ", managementMessageAttributeSizeLimit=" + managementMessageAttributeSizeLimit + ", enableIngressTimestamp=" + enableIngressTimestamp + ", idCacheSize=" + idCacheSize + ", queuePrefetch=" + queuePrefetch + ", initialQueueBufferSize=" + initialQueueBufferSize
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3871,6 +3871,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="no-expiry" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Overrides the expiration time for all messages so that they never expire.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -506,6 +506,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
|||
assertEquals(1L, (long) conf.getAddressSettings().get("a1").getExpiryDelay());
|
||||
assertEquals(2L, (long) conf.getAddressSettings().get("a1").getMinExpiryDelay());
|
||||
assertEquals(3L, (long) conf.getAddressSettings().get("a1").getMaxExpiryDelay());
|
||||
assertTrue(conf.getAddressSettings().get("a1").isNoExpiry());
|
||||
assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES, conf.getAddressSettings().get("a1").isAutoCreateExpiryResources());
|
||||
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX, conf.getAddressSettings().get("a1").getExpiryQueuePrefix());
|
||||
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX, conf.getAddressSettings().get("a1").getExpiryQueueSuffix());
|
||||
|
@ -547,6 +548,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
|
|||
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getExpiryDelay());
|
||||
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMinExpiryDelay());
|
||||
assertEquals(-1L, (long) conf.getAddressSettings().get("a2").getMaxExpiryDelay());
|
||||
assertFalse(conf.getAddressSettings().get("a2").isNoExpiry());
|
||||
assertTrue(conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||
assertEquals("", conf.getAddressSettings().get("a2").getExpiryQueuePrefix().toString());
|
||||
assertEquals(".EXP", conf.getAddressSettings().get("a2").getExpiryQueueSuffix().toString());
|
||||
|
|
|
@ -0,0 +1,308 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class PostOfficeImplTest {
|
||||
|
||||
private static final int EXPIRATION_DELTA = 5000;
|
||||
|
||||
@Test
|
||||
public void testNoExpiryWhenExpirationSetLow() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(1L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true));
|
||||
Mockito.verify(mockMessage).setExpiration(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoExpiryWhenExpirationSetHigh() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(Long.MAX_VALUE);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true));
|
||||
Mockito.verify(mockMessage).setExpiration(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoExpiryWhenExpirationNotSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true));
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiryDelayWhenExpirationNotSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
final long expiryDelay = 123456L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + expiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiryDelayWhenExpirationSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(1L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(9999L));
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinExpiryDelayWhenExpirationNotSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
final long minExpiryDelay = 123456L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + minExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinExpiryDelayWhenExpirationSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
long origExpiration = 1234L;
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration);
|
||||
final long minExpiryDelay = 123456L;
|
||||
assertTrue(minExpiryDelay > origExpiration);
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + minExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxExpiryDelayWhenExpirationNotSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
final long maxExpiryDelay = 123456L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + maxExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxExpiryDelayWhenExpirationSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(Long.MAX_VALUE);
|
||||
final long maxExpiryDelay = 123456L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + maxExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinAndMaxExpiryDelayWhenExpirationNotSet() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
long origExpiration = 0L;
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration);
|
||||
final long minExpiryDelay = 100_000L;
|
||||
final long maxExpiryDelay = 300_000L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + maxExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinAndMaxExpiryDelayWhenExpirationSetInbetween() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
final long startTime = System.currentTimeMillis();
|
||||
long origExpiration = startTime + 200_000L;
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration);
|
||||
final long minExpiryDelay = 100_000L;
|
||||
final long maxExpiryDelay = 300_000L;
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinAndMaxExpiryDelayWhenExpirationSetAbove() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
final long startTime = System.currentTimeMillis();
|
||||
long origExpiration = startTime + 400_000L;
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration);
|
||||
final long minExpiryDelay = 100_000L;
|
||||
final long maxExpiryDelay = 300_000L;
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + maxExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinAndMaxExpiryDelayWhenExpirationSetBelow() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
final long startTime = System.currentTimeMillis();
|
||||
long origExpiration = startTime + 50_000;
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(origExpiration);
|
||||
final long minExpiryDelay = 100_000L;
|
||||
final long maxExpiryDelay = 300_000L;
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setMinExpiryDelay(minExpiryDelay).setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + minExpiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
private void assertExpirationSetAsExpected(final long expectedExpirationLow, final long expectedExpirationHigh, final Long actualExpirationSet) {
|
||||
assertNotNull(actualExpirationSet);
|
||||
|
||||
assertTrue(actualExpirationSet >= expectedExpirationLow, () -> "Expected set expiration of at least " + expectedExpirationLow + ", but was: " + actualExpirationSet);
|
||||
assertTrue(actualExpirationSet < expectedExpirationHigh, "Expected set expiration less than " + expectedExpirationHigh + ", but was: " + actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedencNoExpiryOverExpiryDelay() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setExpiryDelay(10L));
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedencNoExpiryOverMaxExpiryDelay() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setMaxExpiryDelay(10L));
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedencNoExpiryOverMinExpiryDelay() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setNoExpiry(true).setMinExpiryDelay(10L));
|
||||
Mockito.verify(mockMessage, Mockito.never()).setExpiration(Mockito.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedencExpiryDelayOverMaxExpiryDelay() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
final long expiryDelay = 1000L;
|
||||
final long maxExpiryDelay = 999999999L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay).setMaxExpiryDelay(maxExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + expiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedencExpiryDelayOverMinExpiryDelay() {
|
||||
Message mockMessage = Mockito.mock(Message.class);
|
||||
Mockito.when(mockMessage.getExpiration()).thenReturn(0L);
|
||||
final long expiryDelay = 1000L;
|
||||
final long minExpiryDelay = 999999999L;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
PostOfficeImpl.applyExpiryDelay(mockMessage, new AddressSettings().setExpiryDelay(expiryDelay).setMinExpiryDelay(minExpiryDelay));
|
||||
|
||||
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||
Mockito.verify(mockMessage).setExpiration(captor.capture());
|
||||
|
||||
final long expectedExpirationLow = startTime + expiryDelay;
|
||||
final long expectedExpirationHigh = expectedExpirationLow + EXPIRATION_DELTA; // Allowing a delta
|
||||
final Long actualExpirationSet = captor.getValue();
|
||||
|
||||
assertExpirationSetAsExpected(expectedExpirationLow, expectedExpirationHigh, actualExpirationSet);
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.settings;
|
|||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
|
@ -60,6 +61,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES, addressSettings.isAutoDeleteAddresses());
|
||||
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), addressSettings.isDefaultPurgeOnNoConsumers());
|
||||
assertEquals(Integer.valueOf(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()), addressSettings.getDefaultMaxConsumers());
|
||||
assertEquals(AddressSettings.DEFAULT_NO_EXPIRY, addressSettings.isNoExpiry());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -93,6 +95,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
addressSettingsToMerge.setMaxExpiryDelay(777L);
|
||||
addressSettingsToMerge.setIDCacheSize(5);
|
||||
addressSettingsToMerge.setInitialQueueBufferSize(256);
|
||||
addressSettingsToMerge.setNoExpiry(true);
|
||||
|
||||
if (copy) {
|
||||
addressSettings = addressSettings.mergeCopy(addressSettingsToMerge);
|
||||
|
@ -115,6 +118,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay());
|
||||
assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize());
|
||||
assertEquals(Integer.valueOf(256), addressSettings.getInitialQueueBufferSize());
|
||||
assertTrue(addressSettings.isNoExpiry());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -139,6 +143,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
|
||||
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
|
||||
addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
|
||||
addressSettingsToMerge.setNoExpiry(true);
|
||||
if (copy) {
|
||||
addressSettings = addressSettings.mergeCopy(addressSettingsToMerge);
|
||||
} else {
|
||||
|
@ -166,6 +171,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001);
|
||||
assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
|
||||
assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
|
||||
assertTrue(addressSettings.isNoExpiry());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -236,6 +242,7 @@ public class AddressSettingsTest extends ServerTestBase {
|
|||
addressSettings.setRedeliveryDelay(1003);
|
||||
addressSettings.setRedeliveryMultiplier(1.0);
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
|
||||
addressSettings.setNoExpiry(true);
|
||||
|
||||
String json = addressSettings.toJSON();
|
||||
logger.info("Json:: {}", json);
|
||||
|
|
|
@ -577,6 +577,7 @@
|
|||
<expiry-delay>1</expiry-delay>
|
||||
<min-expiry-delay>2</min-expiry-delay>
|
||||
<max-expiry-delay>3</max-expiry-delay>
|
||||
<no-expiry>true</no-expiry>
|
||||
<redelivery-delay>1</redelivery-delay>
|
||||
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
|
||||
<max-size-bytes>817M</max-size-bytes>
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
<expiry-delay>1</expiry-delay>
|
||||
<min-expiry-delay>2</min-expiry-delay>
|
||||
<max-expiry-delay>3</max-expiry-delay>
|
||||
<no-expiry>true</no-expiry>
|
||||
<redelivery-delay>1</redelivery-delay>
|
||||
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
|
||||
<max-size-bytes>817M</max-size-bytes>
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
<expiry-delay>1</expiry-delay>
|
||||
<min-expiry-delay>2</min-expiry-delay>
|
||||
<max-expiry-delay>3</max-expiry-delay>
|
||||
<no-expiry>true</no-expiry>
|
||||
<redelivery-delay>1</redelivery-delay>
|
||||
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
|
||||
<max-size-bytes>817M</max-size-bytes>
|
||||
|
|
|
@ -31,7 +31,10 @@ Here an example of an `address-setting` entry that might be found in the `broker
|
|||
<auto-create-expiry-resources>false</auto-create-expiry-resources>
|
||||
<expiry-queue-prefix></expiry-queue-prefix>
|
||||
<expiry-queue-suffix></expiry-queue-suffix>
|
||||
<expiry-delay>123</expiry-delay>
|
||||
<no-expiry>false</no-expiry>
|
||||
<expiry-delay>-1</expiry-delay>
|
||||
<min-expiry-delay>-1</min-expiry-delay>
|
||||
<max-expiry-delay>-1</max-expiry-delay>
|
||||
<redelivery-delay>5000</redelivery-delay>
|
||||
<redelivery-delay-multiplier>1.0</redelivery-delay-multiplier>
|
||||
<redelivery-collision-avoidance-factor>0.0</redelivery-collision-avoidance-factor>
|
||||
|
@ -123,12 +126,24 @@ The suffix used for automatically created expiry queues.
|
|||
Default is empty.
|
||||
Read more in the chapter about xref:message-expiry.adoc#message-expiry[message expiry].
|
||||
|
||||
no-expiry::
|
||||
If `true` this overrides the expiration time for _all_ messages so that they never expire.
|
||||
The default is `false`.
|
||||
Read more about xref:message-expiry.adoc#configuring-expiry-delay[message expiry].
|
||||
|
||||
expiry-delay::
|
||||
The expiration time that will be used for messages which are using the default expiration time (i.e. 0).
|
||||
For example, if `expiry-delay` is set to "10" and a message which is using the default expiration time (i.e. 0) arrives then its expiration time of "0" will be changed to "10." However, if a message which is using an expiration time of "20" arrives then its expiration time will remain unchanged.
|
||||
Setting `expiry-delay` to "-1" will disable this feature.
|
||||
The default is "-1".
|
||||
Read more about xref:message-expiry.adoc#configuring-expiry-addresses[message expiry].
|
||||
The expiration time that will be used for messages which are using the default expiration time (i.e. `0`).
|
||||
For example, if `expiry-delay` is set to `10` and a message which is using the default expiration time (i.e. `0`) arrives then its expiration time of `0` will be changed to `10`.
|
||||
However, if a message which is using an expiration time of `20` arrives then its expiration time will remain unchanged.
|
||||
Setting `expiry-delay` to `-1` will disable this feature.
|
||||
The default is `-1`.
|
||||
Read more about xref:message-expiry.adoc#configuring-expiry-delay[message expiry].
|
||||
|
||||
min-expiry-delay::
|
||||
max-expiry-delay::
|
||||
These are applied if the aforementioned `expiry-delay` isn't set.
|
||||
Unlike `expiry-delay`, they can impact the expiration of a message even if that message is using a non-default expiration time.
|
||||
There are a xref:message-expiry.adoc#configuring-expiry-delay[handful of rules] which dictate the behavior of these settings.
|
||||
|
||||
max-delivery-attempts::
|
||||
defines how many time a cancelled message can be redelivered before sending to the `dead-letter-address`.
|
||||
|
|
|
@ -36,7 +36,40 @@ a Long property containing the _actual expiration time_ of the expired message
|
|||
|
||||
== Configuring Expiry Delay
|
||||
|
||||
Default expiry delay can be configured in the address-setting configuration:
|
||||
There are multiple address-settings which you can use to modify the expiry delay for incoming messages:
|
||||
|
||||
. `no-expiry`
|
||||
. `expiry-delay`
|
||||
. `max-expiry-delay` & `min-expiry-delay`
|
||||
|
||||
These settings are applied exclusively in this order of precedence. For example, if `no-expiry` is set and `expiry-delay` is also set then `expiry-delay` is ignored completely and `no-expiry` is enforced.
|
||||
|
||||
[WARNING]
|
||||
====
|
||||
If you set any of these values for the `expiry-address` then messages which expire will have corresponding new expiry delays potentially causing the expired messages to themselves expire and be removed completely from the broker.
|
||||
====
|
||||
|
||||
Let's look at each of these in turn.
|
||||
|
||||
=== Never Expire
|
||||
|
||||
If you want to force messages to _never_ expire regardless of their existing settings then set `no-expiry` to `true`, e.g.:
|
||||
|
||||
[,xml]
|
||||
----
|
||||
<!-- messages will never expire -->
|
||||
<address-setting match="exampleQueue">
|
||||
<no-expiry>true</no-expiry>
|
||||
</address-setting>
|
||||
----
|
||||
|
||||
For example, if `no-expiry` is set to `true` and a message which is using an expiration of `10` arrives then its expiration time of `10` will be changed to `0`.
|
||||
|
||||
The default is `false`.
|
||||
|
||||
=== Modify Default Expiry
|
||||
|
||||
To modify the expiry delay on a message using the _default expiration_ (i.e. `0`) set `expiry-delay`, e.g.
|
||||
|
||||
[,xml]
|
||||
----
|
||||
|
@ -47,14 +80,14 @@ Default expiry delay can be configured in the address-setting configuration:
|
|||
</address-setting>
|
||||
----
|
||||
|
||||
`expiry-delay` defines the expiration time in milliseconds that will be used for messages which are using the default expiration time (i.e. 0).
|
||||
For example, if `expiry-delay` is set to `10` and a message which is using the default expiration time (i.e. `0`) arrives then its expiration time of `0` will be changed to `10`.
|
||||
However, if a message which is using an expiration time of `20` arrives then its expiration time will remain unchanged.
|
||||
|
||||
For example, if `expiry-delay` is set to "10" and a message which is using the default expiration time (i.e. 10) arrives then its expiration time of "0" will be changed to "10." However, if a message which is using an expiration time of "20" arrives then its expiration time will remain unchanged.
|
||||
Setting `expiry-delay` to "-1" will disable this feature.
|
||||
This value is measured in milliseconds. The default is `-1` (i.e. disabled).
|
||||
|
||||
The default is `-1`.
|
||||
=== Enforce an Expiry Range
|
||||
|
||||
If `expiry-delay` is _not set_ then minimum and maximum expiry delay values can be configured in the address-setting configuration.
|
||||
To enforce a range of expiry delay values
|
||||
|
||||
[,xml]
|
||||
----
|
||||
|
@ -67,20 +100,17 @@ If `expiry-delay` is _not set_ then minimum and maximum expiry delay values can
|
|||
Semantics are as follows:
|
||||
|
||||
* Messages _without_ an expiration will be set to `max-expiry-delay`.
|
||||
If `max-expiry-delay` is not defined then the message will be set to `min-expiry-delay`.
|
||||
If `min-expiry-delay` is not defined then the message will not be changed.
|
||||
* Messages with an expiration _above_ `max-expiry-delay` will be set to `max-expiry-delay`
|
||||
* Messages with an expiration _below_ `min-expiry-delay` will be set to `min-expiry-delay`
|
||||
* Messages with an expiration _within_ `min-expiry-delay` and `max-expiry-delay` range will not be changed
|
||||
* Any value set for `expiry-delay` other than the default (i.e. `-1`) will override the aforementioned min/max settings.
|
||||
** If `max-expiry-delay` is not defined then the message will be set to `min-expiry-delay`.
|
||||
** If `min-expiry-delay` is not defined then the message will not be changed.
|
||||
* Messages with an expiration _above_ `max-expiry-delay` will be set to `max-expiry-delay`.
|
||||
* Messages with an expiration _below_ `min-expiry-delay` will be set to `min-expiry-delay`.
|
||||
* Messages with an expiration _within_ `min-expiry-delay` and `max-expiry-delay` range will not be changed.
|
||||
|
||||
The default for both `min-expiry-delay` and `max-expiry-delay` is `-1` (i.e. disabled).
|
||||
These values are measured in milliseconds. The default for both is `-1` (i.e. disabled).
|
||||
|
||||
[WARNING]
|
||||
====
|
||||
**If you set expiry-delay, or min/max-expiry-delay, on the expiration target address beware of the following:**
|
||||
|
||||
* Messages will get a new expiration when moved to the expiry queue, rather than being set to 0 as usual, and so may disappear after the new expiration.
|
||||
Setting a value of `0` for `max-expiry-delay` will cause messages to expire _immediately_.
|
||||
====
|
||||
|
||||
== Configuring Expiry Addresses
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class JMSMessageExpiryTest extends MultiprotocolJMSClientTestSupport {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final long EXPIRY_DELAY = 10_000_000L;
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMessageExpiryDelay() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.NORMAL, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMessageExpiryDelay() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.NORMAL, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMessageExpiryDelay() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.NORMAL, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreLargeMessageExpiryDelay() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.NORMAL, false, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpLargeMessageExpiryDelay() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.NORMAL, false, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireLargeMessageExpiryDelay() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.NORMAL, false, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreLargeMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.NORMAL, false, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpLargeMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.NORMAL, false, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireLargeMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.NORMAL, false, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.NORMAL, false, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.NORMAL, false, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMessageExpiryDelayWithBrokerRestart() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.NORMAL, false, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMaxExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.MAX, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMaxExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.MAX, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMaxExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.MAX, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMinExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.MIN, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMinExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.MIN, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMinExpiryDelayNoExpiration() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.MIN, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMaxExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.MAX, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMaxExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.MAX, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMaxExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.MAX, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMinExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.MIN, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMinExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.MIN, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMinExpiryDelayWithExpiration() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.MIN, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testCoreMessageNoExpiry() throws Exception {
|
||||
testExpiry(CoreConnection, DelayType.NEVER, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testAmqpMessageNoExpiry() throws Exception {
|
||||
testExpiry(AMQPConnection, DelayType.NEVER, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(30)
|
||||
public void testOpenWireMessageNoExpiry() throws Exception {
|
||||
testExpiry(OpenWireConnection, DelayType.NEVER, true);
|
||||
}
|
||||
|
||||
private void testExpiry(ConnectionSupplier supplier, DelayType delayType, boolean setTimeToLive) throws Exception {
|
||||
testExpiry(supplier, delayType, setTimeToLive, false, false);
|
||||
}
|
||||
|
||||
private void testExpiry(ConnectionSupplier supplier, DelayType delayType, boolean setTimeToLive, boolean useLargeMessage, boolean restartBroker) throws Exception {
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
if (delayType == DelayType.NORMAL) {
|
||||
addressSettings.setExpiryDelay(EXPIRY_DELAY);
|
||||
} else if (delayType == DelayType.MIN) {
|
||||
addressSettings.setMinExpiryDelay(EXPIRY_DELAY);
|
||||
} else if (delayType == DelayType.MAX) {
|
||||
addressSettings.setMaxExpiryDelay(EXPIRY_DELAY);
|
||||
} else if (delayType == DelayType.NEVER) {
|
||||
addressSettings.setNoExpiry(true);
|
||||
}
|
||||
server.getAddressSettingsRepository().addMatch(getQueueName(), addressSettings);
|
||||
|
||||
Connection producerConnection = supplier.createConnection();
|
||||
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue q = session.createQueue(getQueueName());
|
||||
MessageProducer producer = session.createProducer(q);
|
||||
if (setTimeToLive) {
|
||||
if (delayType == DelayType.MIN) {
|
||||
producer.setTimeToLive(EXPIRY_DELAY / 2);
|
||||
} else if (delayType == DelayType.MAX) {
|
||||
producer.setTimeToLive(EXPIRY_DELAY * 2);
|
||||
} else if (delayType == DelayType.NEVER) {
|
||||
producer.setTimeToLive(EXPIRY_DELAY);
|
||||
}
|
||||
}
|
||||
BytesMessage m = session.createBytesMessage();
|
||||
if (useLargeMessage) {
|
||||
m.writeBytes(RandomUtil.randomBytes(server.getConfiguration().getJournalBufferSize_NIO() * 2));
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
producer.send(m);
|
||||
producerConnection.close();
|
||||
if (useLargeMessage) {
|
||||
validateNoFilesOnLargeDir(getLargeMessagesDir(), 1);
|
||||
}
|
||||
if (restartBroker) {
|
||||
server.stop();
|
||||
server.start();
|
||||
}
|
||||
Connection consumerConnection = supplier.createConnection();
|
||||
session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
q = session.createQueue(getQueueName());
|
||||
MessageConsumer consumer = session.createConsumer(q);
|
||||
consumerConnection.start();
|
||||
m = (BytesMessage) consumer.receive(1500);
|
||||
long stop = System.currentTimeMillis();
|
||||
assertNotNull(m);
|
||||
consumerConnection.close();
|
||||
if (delayType == DelayType.NEVER) {
|
||||
assertEquals(0, m.getJMSExpiration());
|
||||
} else {
|
||||
long duration = stop - start;
|
||||
long delayOnMessage = m.getJMSExpiration() - stop;
|
||||
assertTrue(delayOnMessage >= (EXPIRY_DELAY - duration));
|
||||
assertTrue(delayOnMessage <= EXPIRY_DELAY);
|
||||
}
|
||||
if (useLargeMessage) {
|
||||
validateNoFilesOnLargeDir();
|
||||
}
|
||||
}
|
||||
|
||||
enum DelayType {
|
||||
NORMAL, MIN, MAX, NEVER;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue