This commit is contained in:
Clebert Suconic 2020-04-23 17:56:51 -04:00
commit 46e1207942
11 changed files with 288 additions and 6 deletions

View File

@ -177,6 +177,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String EXPIRY_DELAY_NODE_NAME = "expiry-delay";
private static final String MIN_EXPIRY_DELAY_NODE_NAME = "min-expiry-delay";
private static final String MAX_EXPIRY_DELAY_NODE_NAME = "max-expiry-delay";
private static final String REDELIVERY_DELAY_NODE_NAME = "redelivery-delay";
private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
@ -1064,6 +1068,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setExpiryAddress(queueName);
} else if (EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setExpiryDelay(XMLUtil.parseLong(child));
} else if (MIN_EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMinExpiryDelay(XMLUtil.parseLong(child));
} else if (MAX_EXPIRY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxExpiryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {

View File

@ -2830,6 +2830,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
return settings.add("expiryDelay", addressSettings.getExpiryDelay())
.add("minExpiryDelay", addressSettings.getExpiryDelay())
.add("maxExpiryDelay", addressSettings.getExpiryDelay())
.add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
.add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
.add("maxSizeBytes", addressSettings.getMaxSizeBytes())

View File

@ -1146,10 +1146,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// A -1 <expiry-delay> means don't do anything
if (expirationOverride >= 0) {
// only override the exiration on messages where the expiration hasn't been set by the user
// only override the expiration on messages where the expiration hasn't been set by the user
if (message.getExpiration() == 0) {
message.setExpiration(System.currentTimeMillis() + expirationOverride);
}
} else {
long minExpiration = addressSettingsRepository.getMatch(address.toString()).getMinExpiryDelay();
long maxExpiration = addressSettingsRepository.getMatch(address.toString()).getMaxExpiryDelay();
if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
message.setExpiration(System.currentTimeMillis() + maxExpiration);
} else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
message.setExpiration(System.currentTimeMillis() + minExpiration);
}
}
}

View File

@ -100,6 +100,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final long DEFAULT_EXPIRY_DELAY = -1;
public static final long DEFAULT_MIN_EXPIRY_DELAY = -1;
public static final long DEFAULT_MAX_EXPIRY_DELAY = -1;
public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;
public static final long DEFAULT_SLOW_CONSUMER_THRESHOLD = -1;
@ -149,6 +153,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Long expiryDelay = AddressSettings.DEFAULT_EXPIRY_DELAY;
private Long minExpiryDelay = AddressSettings.DEFAULT_MIN_EXPIRY_DELAY;
private Long maxExpiryDelay = AddressSettings.DEFAULT_MAX_EXPIRY_DELAY;
private Boolean defaultLastValueQueue = null;
private SimpleString defaultLastValueKey = null;
@ -264,6 +272,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.expiryQueuePrefix = other.expiryQueuePrefix;
this.expiryQueueSuffix = other.expiryQueueSuffix;
this.expiryDelay = other.expiryDelay;
this.minExpiryDelay = other.minExpiryDelay;
this.maxExpiryDelay = other.maxExpiryDelay;
this.defaultLastValueQueue = other.defaultLastValueQueue;
this.defaultLastValueKey = other.defaultLastValueKey;
this.defaultNonDestructive = other.defaultNonDestructive;
@ -677,6 +687,24 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public Long getMinExpiryDelay() {
return minExpiryDelay;
}
public AddressSettings setMinExpiryDelay(final Long minExpiryDelay) {
this.minExpiryDelay = minExpiryDelay;
return this;
}
public Long getMaxExpiryDelay() {
return maxExpiryDelay;
}
public AddressSettings setMaxExpiryDelay(final Long maxExpiryDelay) {
this.maxExpiryDelay = maxExpiryDelay;
return this;
}
public boolean isSendToDLAOnNoRoute() {
return sendToDLAOnNoRoute != null ? sendToDLAOnNoRoute : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
}
@ -900,6 +928,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (expiryDelay == null) {
expiryDelay = merged.expiryDelay;
}
if (minExpiryDelay == null) {
minExpiryDelay = merged.minExpiryDelay;
}
if (maxExpiryDelay == null) {
maxExpiryDelay = merged.maxExpiryDelay;
}
if (redistributionDelay == null) {
redistributionDelay = merged.redistributionDelay;
}
@ -1231,6 +1265,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
expiryQueueSuffix = buffer.readNullableSimpleString();
}
if (buffer.readableBytes() > 0) {
minExpiryDelay = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
maxExpiryDelay = BufferHelper.readNullableLong(buffer);
}
}
@Override
@ -1250,6 +1292,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
SimpleString.sizeofNullableString(deadLetterAddress) +
SimpleString.sizeofNullableString(expiryAddress) +
BufferHelper.sizeOfNullableLong(expiryDelay) +
BufferHelper.sizeOfNullableLong(minExpiryDelay) +
BufferHelper.sizeOfNullableLong(maxExpiryDelay) +
BufferHelper.sizeOfNullableBoolean(defaultLastValueQueue) +
BufferHelper.sizeOfNullableLong(redistributionDelay) +
BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) +
@ -1408,6 +1452,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
buffer.writeNullableSimpleString(expiryQueuePrefix);
buffer.writeNullableSimpleString(expiryQueueSuffix);
BufferHelper.writeNullableLong(buffer, minExpiryDelay);
BufferHelper.writeNullableLong(buffer, maxExpiryDelay);
}
/* (non-Javadoc)
@ -1422,6 +1470,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((dropMessagesWhenFull == null) ? 0 : dropMessagesWhenFull.hashCode());
result = prime * result + ((expiryAddress == null) ? 0 : expiryAddress.hashCode());
result = prime * result + ((expiryDelay == null) ? 0 : expiryDelay.hashCode());
result = prime * result + ((minExpiryDelay == null) ? 0 : expiryDelay.hashCode());
result = prime * result + ((maxExpiryDelay == null) ? 0 : expiryDelay.hashCode());
result = prime * result + ((defaultLastValueQueue == null) ? 0 : defaultLastValueQueue.hashCode());
result = prime * result + ((defaultLastValueKey == null) ? 0 : defaultLastValueKey.hashCode());
result = prime * result + ((defaultNonDestructive == null) ? 0 : defaultNonDestructive.hashCode());
@ -1515,6 +1565,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!expiryDelay.equals(other.expiryDelay))
return false;
if (minExpiryDelay == null) {
if (other.minExpiryDelay != null)
return false;
} else if (!minExpiryDelay.equals(other.minExpiryDelay))
return false;
if (maxExpiryDelay == null) {
if (other.maxExpiryDelay != null)
return false;
} else if (!maxExpiryDelay.equals(other.maxExpiryDelay))
return false;
if (defaultLastValueQueue == null) {
if (other.defaultLastValueQueue != null)
return false;
@ -1817,6 +1877,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
expiryAddress +
", expiryDelay=" +
expiryDelay +
", minExpiryDelay=" +
minExpiryDelay +
", maxExpiryDelay=" +
maxExpiryDelay +
", defaultLastValueQueue=" +
defaultLastValueQueue +
", defaultLastValueKey=" +

View File

@ -3136,6 +3136,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="min-expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Overrides the expiration time for messages using a lower value. "-1" disables this setting.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Overrides the expiration time for messages using a higher value. "-1" disables this setting.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="redelivery-delay" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -345,6 +345,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueuePrefix());
assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueueSuffix());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1L, (long) conf.getAddressesSettings().get("a1").getExpiryDelay());
assertEquals(2L, (long) conf.getAddressesSettings().get("a1").getMinExpiryDelay());
assertEquals(3L, (long) conf.getAddressesSettings().get("a1").getMaxExpiryDelay());
assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES, conf.getAddressesSettings().get("a1").isAutoCreateExpiryResources());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX, conf.getAddressesSettings().get("a1").getExpiryQueuePrefix());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX, conf.getAddressesSettings().get("a1").getExpiryQueueSuffix());
@ -375,6 +378,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("", conf.getAddressesSettings().get("a2").getDeadLetterQueuePrefix().toString());
assertEquals(".DLQ", conf.getAddressesSettings().get("a2").getDeadLetterQueueSuffix().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(-1L, (long) conf.getAddressesSettings().get("a2").getExpiryDelay());
assertEquals(-1L, (long) conf.getAddressesSettings().get("a2").getMinExpiryDelay());
assertEquals(-1L, (long) conf.getAddressesSettings().get("a2").getMaxExpiryDelay());
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
assertEquals("", conf.getAddressesSettings().get("a2").getExpiryQueuePrefix().toString());
assertEquals(".EXP", conf.getAddressesSettings().get("a2").getExpiryQueueSuffix().toString());

View File

@ -413,6 +413,9 @@
<address-setting match="a1">
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<expiry-delay>1</expiry-delay>
<min-expiry-delay>2</min-expiry-delay>
<max-expiry-delay>3</max-expiry-delay>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>

View File

@ -18,6 +18,9 @@
<address-setting match="a1">
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<expiry-delay>1</expiry-delay>
<min-expiry-delay>2</min-expiry-delay>
<max-expiry-delay>3</max-expiry-delay>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>

View File

@ -2992,6 +2992,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="min-expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Overrides the expiration time for messages using a lower value. "-1" disables this setting.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-expiry-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Overrides the expiration time for messages using a higher value. "-1" disables this setting.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="redelivery-delay" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -48,7 +48,7 @@ properties:
## Configuring Expiry Delay
Default Expiry delay can be configured in the address-setting configuration:
Default expiry delay can be configured in the address-setting configuration:
```xml
<!-- expired messages in exampleQueue will be sent to the expiry address expiryQueue -->
@ -62,11 +62,35 @@ Default Expiry delay can be configured in the address-setting configuration:
which are using the default expiration time (i.e. 0).
For example, if `expiry-delay` is set to "10" and a message which is using the default
expiration time (i.e.10) arrives then its expiration time of "0" will be changed to "10."
expiration time (i.e. 10) arrives then its expiration time of "0" will be changed to "10."
However, if a message which is using an expiration time of "20" arrives then its expiration
time will remain unchanged. Setting `expiry-delay` to "-1" will disable this feature.
The default is "-1".
The default is `-1`.
If `expiry-delay` is *not set* then minimum and maximum expiry delay values can be configured
in the address-setting configuration.
```xml
<address-setting match="exampleQueue">
<min-expiry-delay>10</min-expiry-delay>
<max-expiry-delay>100</max-expiry-delay>
</address-setting>
```
Semantics are as follows:
- Messages _without_ an expiration will be set to `max-expiry-delay`. If `max-expiry-delay`
is not defined then the message will be set to `min-expiry-delay`. If `min-expiry-delay`
is not defined then the message will not be changed.
- Messages with an expiration _above_ `max-expiry-delay` will be set to `max-expiry-delay`
- Messages with an expiration _below_ `min-expiry-delay` will be set to `min-expiry-delay`
- Messages with an expiration _within_ `min-expiry-delay` and `max-expiry-delay` range will
not be changed
- Any value set for `expiry-delay` other than the default (i.e. `-1`) will override the
aforementioned min/max settings.
The default for both `min-expiry-delay` and `max-expiry-delay` is `-1` (i.e. disabled).
## Configuring Expiry Addresses

View File

@ -34,7 +34,9 @@ import org.junit.Test;
public class MessageExpirationTest extends ActiveMQTestBase {
private static final int EXPIRATION = 200;
private static final int EXPIRATION = 1000;
private static final int MIN_EXPIRATION = 500;
private static final int MAX_EXPIRATION = 1500;
private ActiveMQServer server;
@ -92,6 +94,135 @@ public class MessageExpirationTest extends ActiveMQTestBase {
session.deleteQueue(queue);
}
@Test
public void testMessageWithNoExpirationMinExpiryDelayOverride() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.close();
session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
AddressSettings addressSettings = new AddressSettings().setMinExpiryDelay((long) MIN_EXPIRATION);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
producer.send(message);
long start = System.currentTimeMillis();
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> server.locateQueue(queue).getMessagesExpired() == 1, MIN_EXPIRATION + 200, 50);
assertTrue(System.currentTimeMillis() - start > MIN_EXPIRATION);
session.deleteQueue(queue);
}
@Test
public void testMessageWithTooSmallExpirationMinExpiryDelayOverride() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.close();
session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
AddressSettings addressSettings = new AddressSettings().setMinExpiryDelay((long) MIN_EXPIRATION);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
message.setExpiration(System.currentTimeMillis() + (MIN_EXPIRATION / 2));
producer.send(message);
long start = System.currentTimeMillis();
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> server.locateQueue(queue).getMessagesExpired() == 1, MIN_EXPIRATION + 200, 50);
assertTrue(System.currentTimeMillis() - start > MIN_EXPIRATION);
session.deleteQueue(queue);
}
@Test
public void testMessageWithNoExpirationMaxExpiryDelayOverride() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.close();
session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
AddressSettings addressSettings = new AddressSettings().setMaxExpiryDelay((long) MAX_EXPIRATION);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
producer.send(message);
long start = System.currentTimeMillis();
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> server.locateQueue(queue).getMessagesExpired() == 1, MAX_EXPIRATION + 200, 50);
assertTrue(System.currentTimeMillis() - start <= (MAX_EXPIRATION + 200));
session.deleteQueue(queue);
}
@Test
public void testMessageWithTooLargeExpirationMaxExpiryDelayOverride() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.close();
session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
AddressSettings addressSettings = new AddressSettings().setMaxExpiryDelay((long) MAX_EXPIRATION);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
message.setExpiration(System.currentTimeMillis() + (MAX_EXPIRATION * 2));
producer.send(message);
long start = System.currentTimeMillis();
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> server.locateQueue(queue).getMessagesExpired() == 1, MAX_EXPIRATION + 100, 50);
assertTrue(System.currentTimeMillis() - start <= (MAX_EXPIRATION + 200));
session.deleteQueue(queue);
}
@Test
public void testMessageWithAcceptableExpirationMinMaxExpiryDelayOverride() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.close();
session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
AddressSettings addressSettings = new AddressSettings().setMinExpiryDelay((long) MIN_EXPIRATION).setMaxExpiryDelay((long) MAX_EXPIRATION);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
message.setExpiration(System.currentTimeMillis() + EXPIRATION);
producer.send(message);
long start = System.currentTimeMillis();
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> server.locateQueue(queue).getMessagesExpired() == 1, EXPIRATION + 100, 50);
assertTrue(System.currentTimeMillis() - start > MIN_EXPIRATION);
assertTrue(System.currentTimeMillis() - start < MAX_EXPIRATION);
session.deleteQueue(queue);
}
@Override
@Before
public void setUp() throws Exception {
@ -99,7 +230,7 @@ public class MessageExpirationTest extends ActiveMQTestBase {
server = createServer(false);
server.getConfiguration().setMessageExpiryScanPeriod(500);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.start();
locator = createInVMNonHALocator();