ARTEMIS-2587 auto-create dead-letter resources

This is a reimplementation of the IndividualDeadLetterQueueStrategy
from 5.x in a way that makes sense with the Artemis addressing model.
This commit is contained in:
Justin Bertram 2020-02-14 12:39:19 -06:00 committed by Clebert Suconic
parent d3efc24ffb
commit b76f3b3a0d
19 changed files with 883 additions and 18 deletions

View File

@ -1387,6 +1387,63 @@ public interface ActiveMQServerControl {
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount) throws Exception;
/**
* adds a new address setting for a specific address
*/
@Operation(desc = "Add address settings for addresses matching the addressMatch", impact = MBeanOperationInfo.ACTION)
void addAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
@Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
@Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
@Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
@Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
@Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
@Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
@Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
@Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
@Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
@Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
@Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
@Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
@Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow jms queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created jms queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
@Parameter(desc = "allow jms topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow auto-created jms topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateQueues") boolean autoCreateQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteQueues") boolean autoDeleteQueues,
@Parameter(desc = "allow addresses to be created automatically", name = "autoCreateAddresses") boolean autoCreateAddresses,
@Parameter(desc = "allow auto-created addresses to be deleted automatically", name = "autoDeleteAddresses") boolean autoDeleteAddresses,
@Parameter(desc = "how to deal with queues deleted from XML at runtime", name = "configDeleteQueues") String configDeleteQueues,
@Parameter(desc = "how to deal with addresses deleted from XML at runtime", name = "configDeleteAddresses") String configDeleteAddresses,
@Parameter(desc = "used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` for AMQP clients only", name = "maxSizeBytesRejectThreshold") long maxSizeBytesRejectThreshold,
@Parameter(desc = "last-value-key value if none is set on the queue", name = "defaultLastValueKey") String defaultLastValueKey,
@Parameter(desc = "non-destructive value if none is set on the queue", name = "defaultNonDestructive") boolean defaultNonDestructive,
@Parameter(desc = "exclusive value if none is set on the queue", name = "defaultExclusiveQueue") boolean defaultExclusiveQueue,
@Parameter(desc = "group-rebalance value if none is set on the queue", name = "defaultGroupRebalance") boolean defaultGroupRebalance,
@Parameter(desc = "group-buckets value if none is set on the queue", name = "defaultGroupBuckets") int defaultGroupBuckets,
@Parameter(desc = "group-first-key value if none is set on the queue", name = "defaultGroupFirstKey") String defaultGroupFirstKey,
@Parameter(desc = "max-consumers value if none is set on the queue", name = "defaultMaxConsumers") int defaultMaxConsumers,
@Parameter(desc = "purge-on-no-consumers value if none is set on the queue", name = "defaultPurgeOnNoConsumers") boolean defaultPurgeOnNoConsumers,
@Parameter(desc = "consumers-before-dispatch value if none is set on the queue", name = "defaultConsumersBeforeDispatch") int defaultConsumersBeforeDispatch,
@Parameter(desc = "delay-before-dispatch value if none is set on the queue", name = "defaultDelayBeforeDispatch") long defaultDelayBeforeDispatch,
@Parameter(desc = "routing-type value if none is set on the queue", name = "defaultQueueRoutingType") String defaultQueueRoutingType,
@Parameter(desc = "routing-type value if none is set on the address", name = "defaultAddressRoutingType") String defaultAddressRoutingType,
@Parameter(desc = "consumer-window-size value if none is set on the queue", name = "defaultConsumerWindowSize") int defaultConsumerWindowSize,
@Parameter(desc = "ring-size value if none is set on the queue", name = "defaultRingSize") long defaultRingSize,
@Parameter(desc = "allow created queues to be deleted automatically", name = "autoDeleteCreatedQueues") boolean autoDeleteCreatedQueues,
@Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
@Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
@Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount,
@Parameter(desc = "allow dead-letter address & queue to be created automatically", name = "autoCreateDeadLetterResources") boolean autoCreateDeadLetterResources,
@Parameter(desc = "prefix to use on auto-create dead-letter queue", name = "deadLetterQueuePrefix") String deadLetterQueuePrefix,
@Parameter(desc = "suffix to use on auto-create dead-letter queue", name = "deadLetterQueueSuffix") String deadLetterQueueSuffix) throws Exception;
@Operation(desc = "Remove address settings", impact = MBeanOperationInfo.ACTION)
void removeAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch) throws Exception;

View File

@ -117,6 +117,12 @@ public final class AddressSettingsInfo {
private final long retroactiveMessageCount;
private final boolean autoCreateDeadLetterResources;
private final String deadLetterQueuePrefix;
private final String deadLetterQueueSuffix;
// Static --------------------------------------------------------
public static AddressSettingsInfo from(final String jsonString) {
@ -167,7 +173,10 @@ public final class AddressSettingsInfo {
object.getJsonNumber("autoDeleteQueuesMessageCount").longValue(),
object.getJsonNumber("autoDeleteAddressesDelay").longValue(),
object.getJsonNumber("redeliveryCollisionAvoidanceFactor").doubleValue(),
object.getJsonNumber("retroactiveMessageCount").longValue());
object.getJsonNumber("retroactiveMessageCount").longValue(),
object.getBoolean("autoCreateDeadLetterResources"),
object.getString("deadLetterQueuePrefix"),
object.getString("deadLetterQueueSuffix"));
}
// Constructors --------------------------------------------------
@ -218,7 +227,10 @@ public final class AddressSettingsInfo {
long autoDeleteQueuesMessageCount,
long autoDeleteAddressesDelay,
double redeliveryCollisionAvoidanceFactor,
long retroactiveMessageCount) {
long retroactiveMessageCount,
boolean autoCreateDeadLetterResources,
String deadLetterQueuePrefix,
String deadLetterQueueSuffix) {
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
this.pageSizeBytes = pageSizeBytes;
@ -266,6 +278,9 @@ public final class AddressSettingsInfo {
this.autoDeleteAddressesDelay = autoDeleteAddressesDelay;
this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
this.retroactiveMessageCount = retroactiveMessageCount;
this.autoCreateDeadLetterResources = autoCreateDeadLetterResources;
this.deadLetterQueuePrefix = deadLetterQueuePrefix;
this.deadLetterQueueSuffix = deadLetterQueueSuffix;
}
// Public --------------------------------------------------------
@ -465,5 +480,17 @@ public final class AddressSettingsInfo {
public long getRetroactiveMessageCount() {
return retroactiveMessageCount;
}
public boolean isAutoCreateDeadLetterResources() {
return autoCreateDeadLetterResources;
}
public String getDeadLetterQueuePrefix() {
return deadLetterQueuePrefix;
}
public String getDeadLetterQueueSuffix() {
return deadLetterQueueSuffix;
}
}

View File

@ -77,7 +77,10 @@ public class AddressSettingsInfoTest {
"\"autoDeleteQueuesMessageCount\":8,\n" +
"\"autoDeleteAddressesDelay\":3003,\n" +
"\"redeliveryCollisionAvoidanceFactor\":1.1,\n" +
"\"retroactiveMessageCount\":101\n" +
"\"retroactiveMessageCount\":101,\n" +
"\"autoCreateDeadLetterResources\":true,\n" +
"\"deadLetterQueuePrefix\":\"FOO.\",\n" +
"\"deadLetterQueueSuffix\":\".FOO\"\n" +
"}";
AddressSettingsInfo addressSettingsInfo = AddressSettingsInfo.from(json);
assertEquals("fullPolicy", addressSettingsInfo.getAddressFullMessagePolicy());
@ -127,6 +130,9 @@ public class AddressSettingsInfoTest {
assertEquals(3003, addressSettingsInfo.getAutoDeleteAddressesDelay());
assertEquals(1.1, addressSettingsInfo.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(101, addressSettingsInfo.getRetroactiveMessageCount());
assertTrue(addressSettingsInfo.isAutoCreateDeadLetterResources());
assertEquals("FOO.", addressSettingsInfo.getDeadLetterQueuePrefix());
assertEquals(".FOO", addressSettingsInfo.getDeadLetterQueueSuffix());
}
}

View File

@ -161,6 +161,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String DEAD_LETTER_ADDRESS_NODE_NAME = "dead-letter-address";
private static final String AUTO_CREATE_DEAD_LETTER_RESOURCES_NODE_NAME = "auto-create-dead-letter-resources";
private static final String DEAD_LETTER_QUEUE_PREFIX_NODE_NAME = "dead-letter-queue-prefix";
private static final String DEAD_LETTER_QUEUE_SUFFIX_NODE_NAME = "dead-letter-queue-suffix";
private static final String EXPIRY_ADDRESS_NODE_NAME = "expiry-address";
private static final String EXPIRY_DELAY_NODE_NAME = "expiry-delay";
@ -1185,6 +1191,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
long retroactiveMessageCount = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(RETROACTIVE_MESSAGE_COUNT, retroactiveMessageCount);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
} else if (AUTO_CREATE_DEAD_LETTER_RESOURCES_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setAutoCreateDeadLetterResources(XMLUtil.parseBoolean(child));
} else if (DEAD_LETTER_QUEUE_PREFIX_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDeadLetterQueuePrefix(new SimpleString(getTrimmedTextContent(child)));
} else if (DEAD_LETTER_QUEUE_SUFFIX_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDeadLetterQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
}
}
return setting;

View File

@ -2753,6 +2753,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
.add("autoDeleteAddressesDelay", addressSettings.getAutoDeleteAddressesDelay())
.add("redeliveryCollisionAvoidanceFactor", addressSettings.getRedeliveryCollisionAvoidanceFactor())
.add("retroactiveMessageCount", addressSettings.getRetroactiveMessageCount())
.add("autoCreateDeadLetterResources", addressSettings.isAutoCreateDeadLetterResources())
.add("deadLetterQueuePrefix", addressSettings.getDeadLetterQueuePrefix().toString())
.add("deadLetterQueueSuffix", addressSettings.getDeadLetterQueueSuffix().toString())
.build()
.toString();
}
@ -2936,6 +2939,113 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final long autoDeleteAddressesDelay,
final double redeliveryCollisionAvoidanceFactor,
final long retroactiveMessageCount) throws Exception {
addAddressSettings(address,
DLA,
expiryAddress,
expiryDelay,
defaultLastValueQueue,
maxDeliveryAttempts,
maxSizeBytes,
pageSizeBytes,
pageMaxCacheSize,
redeliveryDelay,
redeliveryMultiplier,
maxRedeliveryDelay,
redistributionDelay,
sendToDLAOnNoRoute,
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues,
autoCreateJmsTopics,
autoDeleteJmsTopics,
autoCreateQueues,
autoDeleteQueues,
autoCreateAddresses,
autoDeleteAddresses,
configDeleteQueues,
configDeleteAddresses,
maxSizeBytesRejectThreshold,
defaultLastValueKey,
defaultNonDestructive,
defaultExclusiveQueue,
defaultGroupRebalance,
defaultGroupBuckets,
defaultGroupFirstKey,
defaultMaxConsumers,
defaultPurgeOnNoConsumers,
defaultConsumersBeforeDispatch,
defaultDelayBeforeDispatch,
defaultQueueRoutingType,
defaultAddressRoutingType,
defaultConsumerWindowSize,
defaultRingSize,
autoDeleteCreatedQueues,
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount,
AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES,
AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.toString(),
AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX.toString());
}
@Override
public void addAddressSettings(final String address,
final String DLA,
final String expiryAddress,
final long expiryDelay,
final boolean defaultLastValueQueue,
final int maxDeliveryAttempts,
final long maxSizeBytes,
final int pageSizeBytes,
final int pageMaxCacheSize,
final long redeliveryDelay,
final double redeliveryMultiplier,
final long maxRedeliveryDelay,
final long redistributionDelay,
final boolean sendToDLAOnNoRoute,
final String addressFullMessagePolicy,
final long slowConsumerThreshold,
final long slowConsumerCheckPeriod,
final String slowConsumerPolicy,
final boolean autoCreateJmsQueues,
final boolean autoDeleteJmsQueues,
final boolean autoCreateJmsTopics,
final boolean autoDeleteJmsTopics,
final boolean autoCreateQueues,
final boolean autoDeleteQueues,
final boolean autoCreateAddresses,
final boolean autoDeleteAddresses,
final String configDeleteQueues,
final String configDeleteAddresses,
final long maxSizeBytesRejectThreshold,
final String defaultLastValueKey,
final boolean defaultNonDestructive,
final boolean defaultExclusiveQueue,
final boolean defaultGroupRebalance,
final int defaultGroupBuckets,
final String defaultGroupFirstKey,
final int defaultMaxConsumers,
final boolean defaultPurgeOnNoConsumers,
final int defaultConsumersBeforeDispatch,
final long defaultDelayBeforeDispatch,
final String defaultQueueRoutingType,
final String defaultAddressRoutingType,
final int defaultConsumerWindowSize,
final long defaultRingSize,
final boolean autoDeleteCreatedQueues,
final long autoDeleteQueuesDelay,
final long autoDeleteQueuesMessageCount,
final long autoDeleteAddressesDelay,
final double redeliveryCollisionAvoidanceFactor,
final long retroactiveMessageCount,
final boolean autoCreateDeadLetterResources,
final String deadLetterQueuePrefix,
final String deadLetterQueueSuffix) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.addAddressSettings(this.server, address, DLA, expiryAddress, expiryDelay, defaultLastValueQueue, maxDeliveryAttempts,
maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier,
@ -2947,7 +3057,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
defaultGroupFirstKey, defaultMaxConsumers, defaultPurgeOnNoConsumers, defaultConsumersBeforeDispatch,
defaultDelayBeforeDispatch, defaultQueueRoutingType, defaultAddressRoutingType, defaultConsumerWindowSize,
defaultRingSize, autoDeleteCreatedQueues, autoDeleteQueuesDelay, autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor, retroactiveMessageCount);
autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor, retroactiveMessageCount, autoCreateDeadLetterResources,
deadLetterQueuePrefix, deadLetterQueueSuffix);
}
checkStarted();
@ -3009,6 +3120,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
addressSettings.setAutoCreateDeadLetterResources(autoCreateDeadLetterResources);
addressSettings.setDeadLetterQueuePrefix(deadLetterQueuePrefix == null ? null : new SimpleString(deadLetterQueuePrefix));
addressSettings.setDeadLetterQueueSuffix(deadLetterQueueSuffix == null ? null : new SimpleString(deadLetterQueueSuffix));
server.getAddressSettingsRepository().addMatch(address, addressSettings);

View File

@ -3340,6 +3340,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
createDeadLetterResources();
Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);
if (bindingList == null || bindingList.getBindings().isEmpty()) {
@ -3359,6 +3362,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return false;
}
private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
if (addressSettings.isAutoCreateDeadLetterResources() && !getAddress().equals(addressSettings.getDeadLetterAddress())) {
if (addressSettings.getDeadLetterAddress() != null && addressSettings.getDeadLetterAddress().length() != 0) {
SimpleString dlqName = addressSettings.getDeadLetterQueuePrefix().concat(getAddress()).concat(addressSettings.getDeadLetterQueueSuffix());
SimpleString dlqFilter = new SimpleString(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
server.createQueue(addressSettings.getDeadLetterAddress(), RoutingType.MULTICAST, dlqName, dlqFilter, null, true, false, true, false, true, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), true);
}
}
}
private void move(final Transaction originalTX,
final SimpleString address,
final Binding binding,

View File

@ -109,6 +109,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
// Default address drop threshold, applied to address settings with BLOCK policy. -1 means no threshold enabled.
public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
public static final boolean DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES = false;
public static final SimpleString DEFAULT_DEAD_LETTER_QUEUE_PREFIX = SimpleString.toSimpleString("DLQ.");
public static final SimpleString DEFAULT_DEAD_LETTER_QUEUE_SUFFIX = SimpleString.toSimpleString("");
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@ -215,6 +221,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer defaultConsumerWindowSize = null;
private Boolean autoCreateDeadLetterResources = null;
private SimpleString deadLetterQueuePrefix = null;
private SimpleString deadLetterQueueSuffix = null;
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@ -232,6 +244,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
this.maxRedeliveryDelay = other.maxRedeliveryDelay;
this.deadLetterAddress = other.deadLetterAddress;
this.autoCreateDeadLetterResources = other.autoCreateDeadLetterResources;
this.deadLetterQueuePrefix = other.deadLetterQueuePrefix;
this.deadLetterQueueSuffix = other.deadLetterQueueSuffix;
this.expiryAddress = other.expiryAddress;
this.expiryDelay = other.expiryDelay;
this.defaultLastValueQueue = other.defaultLastValueQueue;
@ -629,6 +644,33 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public boolean isAutoCreateDeadLetterResources() {
return autoCreateDeadLetterResources != null ? autoCreateDeadLetterResources : AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES;
}
public AddressSettings setAutoCreateDeadLetterResources(final boolean value) {
autoCreateDeadLetterResources = value;
return this;
}
public SimpleString getDeadLetterQueuePrefix() {
return deadLetterQueuePrefix != null ? deadLetterQueuePrefix : AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX;
}
public AddressSettings setDeadLetterQueuePrefix(final SimpleString value) {
deadLetterQueuePrefix = value;
return this;
}
public SimpleString getDeadLetterQueueSuffix() {
return deadLetterQueueSuffix != null ? deadLetterQueueSuffix : AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX;
}
public AddressSettings setDeadLetterQueueSuffix(final SimpleString value) {
deadLetterQueueSuffix = value;
return this;
}
public long getRedistributionDelay() {
return redistributionDelay != null ? redistributionDelay : AddressSettings.DEFAULT_REDISTRIBUTION_DELAY;
}
@ -933,6 +975,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (retroactiveMessageCount == null) {
retroactiveMessageCount = merged.retroactiveMessageCount;
}
if (autoCreateDeadLetterResources == null) {
autoCreateDeadLetterResources = merged.autoCreateDeadLetterResources;
}
if (deadLetterQueuePrefix == null) {
deadLetterQueuePrefix = merged.deadLetterQueuePrefix;
}
if (deadLetterQueueSuffix == null) {
deadLetterQueueSuffix = merged.deadLetterQueueSuffix;
}
}
@Override
@ -1105,6 +1156,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
retroactiveMessageCount = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
autoCreateDeadLetterResources = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
deadLetterQueuePrefix = buffer.readNullableSimpleString();
}
if (buffer.readableBytes() > 0) {
deadLetterQueueSuffix = buffer.readNullableSimpleString();
}
}
@Override
@ -1158,7 +1221,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount) +
BufferHelper.sizeOfNullableBoolean(autoDeleteCreatedQueues) +
BufferHelper.sizeOfNullableLong(defaultRingSize) +
BufferHelper.sizeOfNullableLong(retroactiveMessageCount);
BufferHelper.sizeOfNullableLong(retroactiveMessageCount) +
BufferHelper.sizeOfNullableBoolean(autoCreateDeadLetterResources) +
SimpleString.sizeofNullableString(deadLetterQueuePrefix) +
SimpleString.sizeofNullableString(deadLetterQueueSuffix);
}
@Override
@ -1264,6 +1330,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
buffer.writeNullableSimpleString(defaultGroupFirstKey);
BufferHelper.writeNullableLong(buffer, retroactiveMessageCount);
BufferHelper.writeNullableBoolean(buffer, autoCreateDeadLetterResources);
buffer.writeNullableSimpleString(deadLetterQueuePrefix);
buffer.writeNullableSimpleString(deadLetterQueueSuffix);
}
/* (non-Javadoc)
@ -1325,6 +1397,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode());
result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
result = prime * result + ((retroactiveMessageCount == null) ? 0 : retroactiveMessageCount.hashCode());
result = prime * result + ((autoCreateDeadLetterResources == null) ? 0 : autoCreateDeadLetterResources.hashCode());
result = prime * result + ((deadLetterQueuePrefix == null) ? 0 : deadLetterQueuePrefix.hashCode());
result = prime * result + ((deadLetterQueueSuffix == null) ? 0 : deadLetterQueueSuffix.hashCode());
return result;
}
@ -1613,6 +1688,25 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!retroactiveMessageCount.equals(other.retroactiveMessageCount))
return false;
if (autoCreateDeadLetterResources == null) {
if (other.autoCreateDeadLetterResources != null)
return false;
} else if (!autoCreateDeadLetterResources.equals(other.autoCreateDeadLetterResources))
return false;
if (deadLetterQueuePrefix == null) {
if (other.deadLetterQueuePrefix != null)
return false;
} else if (!deadLetterQueuePrefix.equals(other.deadLetterQueuePrefix))
return false;
if (deadLetterQueueSuffix == null) {
if (other.deadLetterQueueSuffix != null)
return false;
} else if (!deadLetterQueueSuffix.equals(other.deadLetterQueueSuffix))
return false;
return true;
}
@ -1722,6 +1816,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultRingSize +
", retroactiveMessageCount=" +
retroactiveMessageCount +
", autoCreateDeadLetterResources=" +
autoCreateDeadLetterResources +
", deadLetterQueuePrefix=" +
deadLetterQueuePrefix +
", deadLetterQueueSuffix=" +
deadLetterQueueSuffix +
"]";
}
}

View File

@ -3061,7 +3061,7 @@
</xsd:documentation>
</xsd:annotation>
<xsd:all>
<xsd:element maxOccurs="1" minOccurs="0" name="dead-letter-address" type="xsd:string">
<xsd:element name="dead-letter-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the address to send dead messages to
@ -3069,6 +3069,31 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-create-dead-letter-resources" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to automatically create the dead-letter-address and/or a corresponding queue
on that address when a message found to be undeliverable
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="dead-letter-queue-prefix" type="xsd:string" default="DLQ." maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the prefix to use for auto-created dead letter queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="dead-letter-queue-suffix" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the suffix to use for auto-created dead letter queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="expiry-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
@ -341,6 +342,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertTrue(conf.getAddressesSettings().get("a2") != null);
assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_DEAD_LETTER_RESOURCES, conf.getAddressesSettings().get("a1").isAutoCreateDeadLetterResources());
assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueuePrefix());
assertEquals(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX, conf.getAddressesSettings().get("a1").getDeadLetterQueueSuffix());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(), 0);
@ -365,6 +369,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
assertEquals("", conf.getAddressesSettings().get("a2").getDeadLetterQueuePrefix().toString());
assertEquals(".DLQ", conf.getAddressesSettings().get("a2").getDeadLetterQueueSuffix().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(), 0);

View File

@ -438,6 +438,9 @@
</address-setting>
<address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix></dead-letter-queue-prefix>
<dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<expiry-address>a2.2</expiry-address>
<redelivery-delay>5</redelivery-delay>
<max-size-bytes>932489234928324</max-size-bytes>

View File

@ -43,6 +43,9 @@
</address-setting>
<address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix></dead-letter-queue-prefix>
<dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<expiry-address>a2.2</expiry-address>
<redelivery-delay>5</redelivery-delay>
<max-size-bytes>932489234928324</max-size-bytes>

View File

@ -2917,7 +2917,7 @@
</xsd:documentation>
</xsd:annotation>
<xsd:all>
<xsd:element maxOccurs="1" minOccurs="0" name="dead-letter-address" type="xsd:string">
<xsd:element name="dead-letter-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the address to send dead messages to
@ -2925,6 +2925,31 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-create-dead-letter-resources" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to automatically create the dead-letter-address and/or a corresponding queue
on that address when any matching queue is created
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="dead-letter-queue-prefix" type="xsd:string" default="DLQ." maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the prefix to use for auto-created dead letter queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="dead-letter-queue-suffix" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the suffix to use for auto-created dead letter queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="expiry-address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -569,6 +569,9 @@ that would be found in the `broker.xml` file.
<address-settings>
<address-setting match="order.foo">
<dead-letter-address>DLA</dead-letter-address>
<auto-create-dead-letter-resources>false</auto-create-dead-letter-resources>
<dead-letter-queue-prefix>DLQ.</dead-letter-queue-prefix>
<dead-letter-queue-suffix></dead-letter-queue-suffix>
<expiry-address>ExpiryQueue</expiry-address>
<expiry-delay>123</expiry-delay>
<redelivery-delay>5000</redelivery-delay>
@ -636,6 +639,19 @@ exceed `max-delivery-attempts`. If no address is defined here then such
messages will simply be discarded. Read more about [undelivered
messages](undelivered-messages.md#configuring-dead-letter-addresses).
`auto-create-dead-letter-resources` determines whether or not the broker will
automatically create the defined `dead-letter-address` and a corresponding
dead-letter queue when a message is undeliverable. Read more in the chapter
about [undelivered messages](undelivered-messages.md).
`dead-letter-queue-prefix` defines the prefix used for automatically created
dead-letter queues. Read more in the chapter about
[undelivered messages](undelivered-messages.md).
`dead-letter-queue-suffix` defines the suffix used for automatically created
dead-letter queues. Read more in the chapter about
[undelivered messages](undelivered-messages.md).
`expiry-address` defines where to send a message that has expired. If no
address is defined here then such messages will simply be discarded. Read more
about [message expiry](message-expiry.md#configuring-expiry-addresses).

View File

@ -204,6 +204,9 @@ Name | Description | Default
---|---|---
[match](address-model.md) | The filter to apply to the setting | n/a
[dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
[auto-create-dead-letter-resources](undelivered-messages.md) | Whether or not to auto-create dead-letter address and/or queue | `false`
[dead-letter-queue-prefix](undelivered-messages.md) | Prefix to use for auto-created dead-letter queues | `DLQ.`
[dead-letter-queue-suffix](undelivered-messages.md) | Suffix to use for auto-created dead-letter queues | `` (empty)
[expiry-address](message-expiry.md) | Expired messages address | n/a
[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
[redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message | 0

View File

@ -100,11 +100,11 @@ must be between 0.0 and 1.0.
`java.util.Random`)
1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 \* __-1__) * __.25__)
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 * __1__) * __.75__)
4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 \* __1__) * __.75__)
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 * __-1__) * __.05__)
6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 \* __-1__) * __.05__)
This feature can be particularly useful in environments where there are
multiple consumers on the same queue all interacting transactionally
@ -117,8 +117,8 @@ small, configurable amount these redelivery "collisions" can be avoided.
### Example
See [the examples chapter](examples.md) for an example which shows how delayed redelivery is configured
and used with JMS.
See [the examples chapter](examples.md) for an example which shows how
delayed redelivery is configured and used with JMS.
## Dead Letter Addresses
@ -145,7 +145,7 @@ Dead letter address is defined in the address-setting configuration:
<!-- undelivered messages in exampleQueue will be sent to the dead letter address
deadLetterQueue after 3 unsuccessful delivery attempts -->
<address-setting match="exampleQueue">
<dead-letter-address>deadLetterQueue</dead-letter-address>
<dead-letter-address>deadLetterAddress</dead-letter-address>
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
```
@ -178,10 +178,63 @@ the following properties:
a String property containing the *original queue* of the dead letter
message
### Automatically Creating Dead Letter Resources
It's common to segregate undelivered messages by their original address.
For example, a message sent to the `stocks` address that couldn't be
delivered for some reason might be ultimately routed to the `DLQ.stocks`
queue, and likewise a message sent to the `orders` address that couldn't
be delivered might be routed to the `DLQ.orders` queue.
Using this pattern can make it easy to track and administrate
undelivered messages. However, it can pose a challenge in environments
which predominantly use auto-created addresses and queues. Typically
administrators in those environments don't want to manually create
an `address-setting` to configure the `dead-letter-address` much less
the actual `address` and `queue` to hold the undelivered messages.
The solution to this problem is to set the `auto-create-dead-letter-resources`
`address-setting` to `true` (it's `false` by default) so that the
broker will create the `address` and `queue` to deal with the
undelivered messages automatically. The `address` created will be the
one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
created on that `address`. It will be named by the `address` to which
the message was originally sent, and it will have a filter defined using
the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
receive messages sent to the relevant `address`. The `queue` name can be
configured with a prefix and suffix. See the relevant settings in the
table below:
`address-setting`|default
---|---
`dead-letter-queue-prefix`|`DLQ.`
`dead-letter-queue-suffix`|`` (empty string)
Here is an example configuration:
```xml
<address-setting match="#">
<dead-letter-address>DLA</dead-letter-address>
<max-delivery-attempts>3</max-delivery-attempts>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix></dead-letter-queue-prefix> <!-- override the default -->
<dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
</address-setting>
```
The queue holding the undeliverable messages can be accessed directly
either by using the queue's name by itself (e.g. when using the core
client) or by using the fully qualified queue name (e.g. when using
a JMS client) just like any other queue. Also, note that the queue is
auto-created which means it will be auto-deleted as per the relevant
`address-settings`.
### Example
See: Dead Letter section of the [Examples](examples.md) for an example
that shows how dead letter is configured and used with JMS.
that shows how dead letter resources can be statically configured and
used with JMS.
## Delivery Count Persistence

View File

@ -754,6 +754,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
long autoDeleteAddressesDelay = RandomUtil.randomPositiveLong();
double redeliveryCollisionAvoidanceFactor = RandomUtil.randomDouble();
long retroactiveMessageCount = RandomUtil.randomPositiveLong();
boolean autoCreateDeadLetterResources = RandomUtil.randomBoolean();
String deadLetterQueuePrefix = RandomUtil.randomString();
String deadLetterQueueSuffix = RandomUtil.randomString();
serverControl.addAddressSettings(addressMatch,
DLA,
@ -803,7 +806,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
retroactiveMessageCount,
autoCreateDeadLetterResources,
deadLetterQueuePrefix,
deadLetterQueueSuffix);
boolean ex = false;
try {
@ -855,7 +861,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
retroactiveMessageCount,
autoCreateDeadLetterResources,
deadLetterQueuePrefix,
deadLetterQueueSuffix);
} catch (Exception expected) {
ex = true;
}
@ -914,6 +923,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
assertEquals(autoCreateDeadLetterResources, info.isAutoCreateDeadLetterResources());
assertEquals(deadLetterQueuePrefix, info.getDeadLetterQueuePrefix());
assertEquals(deadLetterQueueSuffix, info.getDeadLetterQueueSuffix());
serverControl.addAddressSettings(addressMatch,
DLA,
@ -963,7 +975,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
retroactiveMessageCount,
autoCreateDeadLetterResources,
deadLetterQueuePrefix,
deadLetterQueueSuffix);
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
info = AddressSettingsInfo.from(jsonString);
@ -1015,6 +1030,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
assertEquals(autoCreateDeadLetterResources, info.isAutoCreateDeadLetterResources());
assertEquals(deadLetterQueuePrefix, info.getDeadLetterQueuePrefix());
assertEquals(deadLetterQueueSuffix, info.getDeadLetterQueueSuffix());
ex = false;
try {
@ -1066,7 +1084,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
retroactiveMessageCount,
autoCreateDeadLetterResources,
deadLetterQueuePrefix,
deadLetterQueueSuffix);
} catch (Exception e) {
ex = true;
}

View File

@ -1008,6 +1008,114 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
retroactiveMessageCount);
}
@Override
public void addAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
@Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
@Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
@Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
@Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
@Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
@Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
@Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
@Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
@Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
@Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
@Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
@Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
@Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow jms queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created jms queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
@Parameter(desc = "allow jms topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow auto-created jms topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateQueues") boolean autoCreateQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteQueues") boolean autoDeleteQueues,
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateAddresses") boolean autoCreateAddresses,
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteAddresses") boolean autoDeleteAddresses,
@Parameter(desc = "how to deal with queues deleted from XML at runtime", name = "configDeleteQueues") String configDeleteQueues,
@Parameter(desc = "how to deal with addresses deleted from XML at runtime", name = "configDeleteAddresses") String configDeleteAddresses,
@Parameter(desc = "used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` for AMQP clients only", name = "maxSizeBytesRejectThreshold") long maxSizeBytesRejectThreshold,
@Parameter(desc = "last-value-key value if none is set on the queue", name = "defaultLastValueKey") String defaultLastValueKey,
@Parameter(desc = "non-destructive value if none is set on the queue", name = "defaultNonDestructive") boolean defaultNonDestructive,
@Parameter(desc = "exclusive value if none is set on the queue", name = "defaultExclusiveQueue") boolean defaultExclusiveQueue,
@Parameter(desc = "group-rebalance value if none is set on the queue", name = "defaultGroupRebalance") boolean defaultGroupRebalance,
@Parameter(desc = "group-buckets value if none is set on the queue", name = "defaultGroupBuckets") int defaultGroupBuckets,
@Parameter(desc = "group-first-key value if none is set on the queue", name = "defaultGroupFirstKey") String defaultGroupFirstKey,
@Parameter(desc = "max-consumers value if none is set on the queue", name = "defaultMaxConsumers") int defaultMaxConsumers,
@Parameter(desc = "purge-on-no-consumers value if none is set on the queue", name = "defaultPurgeOnNoConsumers") boolean defaultPurgeOnNoConsumers,
@Parameter(desc = "consumers-before-dispatch value if none is set on the queue", name = "defaultConsumersBeforeDispatch") int defaultConsumersBeforeDispatch,
@Parameter(desc = "delay-before-dispatch value if none is set on the queue", name = "defaultDelayBeforeDispatch") long defaultDelayBeforeDispatch,
@Parameter(desc = "routing-type value if none is set on the queue", name = "defaultQueueRoutingType") String defaultQueueRoutingType,
@Parameter(desc = "routing-type value if none is set on the address", name = "defaultAddressRoutingType") String defaultAddressRoutingType,
@Parameter(desc = "consumer-window-size value if none is set on the queue", name = "defaultConsumerWindowSize") int defaultConsumerWindowSize,
@Parameter(desc = "ring-size value if none is set on the queue", name = "defaultRingSize") long defaultRingSize,
@Parameter(desc = "allow created queues to be deleted automatically", name = "autoDeleteCreatedQueues") boolean autoDeleteCreatedQueues,
@Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
@Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
@Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount,
@Parameter(desc = "allow dead-letter address & queue to be created automatically", name = "autoCreateDeadLetterResources") boolean autoCreateDeadLetterResources,
@Parameter(desc = "prefix to use on auto-create dead-letter queue", name = "deadLetterQueuePrefix") String deadLetterQueuePrefix,
@Parameter(desc = "suffix to use on auto-create dead-letter queue", name = "deadLetterQueueSuffix") String deadLetterQueueSuffix) throws Exception {
proxy.invokeOperation("addAddressSettings",
addressMatch,
DLA,
expiryAddress,
expiryDelay,
lastValueQueue,
deliveryAttempts,
maxSizeBytes,
pageSizeBytes,
pageMaxCacheSize,
redeliveryDelay,
redeliveryMultiplier,
maxRedeliveryDelay,
redistributionDelay,
sendToDLAOnNoRoute,
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues,
autoCreateJmsTopics,
autoDeleteJmsTopics,
autoCreateQueues,
autoDeleteQueues,
autoCreateAddresses,
autoDeleteAddresses,
configDeleteQueues,
configDeleteAddresses,
maxSizeBytesRejectThreshold,
defaultLastValueKey,
defaultNonDestructive,
defaultExclusiveQueue,
defaultGroupRebalance,
defaultGroupBuckets,
defaultGroupFirstKey,
defaultMaxConsumers,
defaultPurgeOnNoConsumers,
defaultConsumersBeforeDispatch,
defaultDelayBeforeDispatch,
defaultQueueRoutingType,
defaultAddressRoutingType,
defaultConsumerWindowSize,
defaultRingSize,
autoDeleteCreatedQueues,
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount,
autoCreateDeadLetterResources,
deadLetterQueuePrefix,
deadLetterQueueSuffix);
}
@Override
public String listNetworkTopology() throws Exception {
return (String) proxy.invokeOperation("listNetworkTopology");

View File

@ -1073,6 +1073,61 @@ public class QueueControlTest extends ManagementTestBase {
clientConsumer.close();
}
/**
* Test retry - get a message from auto-created DLA/DLQ and put on original queue.
*/
@Test
public void testRetryMessageWithAutoCreatedResources() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
producer.send(createTextMessage(session, sampleText));
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
QueueControl queueControl = createManagementControl(dla, dlq);
assertMessageMetrics(queueControl, 1, true);
final long messageID = getFirstMessageId(queueControl);
// Retry the message - i.e. it should go from DLQ to original Queue.
Assert.assertTrue(queueControl.retryMessage(messageID));
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(queueControl));
assertMessageMetrics(queueControl, 0, durable);
// .. and that the message is now on the original queue once more.
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
@Test
public void testRetryMessageWithoutDLQ() throws Exception {
final SimpleString qName = new SimpleString("q1");

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.JMSContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Before;
import org.junit.Test;
public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
public final SimpleString dla = new SimpleString("myDLA");
private ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false);
// set common address settings needed for all tests; make sure to use getMatch instead of addMatch in invidual tests or these will be overwritten
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateDeadLetterResources(true).setDeadLetterAddress(dla).setMaxDeliveryAttempts(1));
server.start();
}
@Test
public void testAutoCreationOfDeadLetterResources() throws Exception {
int before = server.getActiveMQServerControl().getQueueNames().length;
triggerDlaDelivery();
assertNotNull(server.getAddressInfo(dla));
assertNotNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
assertEquals(2, server.getActiveMQServerControl().getQueueNames().length - before);
}
@Test
public void testAutoCreationOfDeadLetterResourcesWithNullDLA() throws Exception {
testAutoCreationOfDeadLetterResourcesWithNoDLA(null);
}
@Test
public void testAutoCreationOfDeadLetterResourcesWithEmptyDLA() throws Exception {
testAutoCreationOfDeadLetterResourcesWithNoDLA(SimpleString.toSimpleString(""));
}
private void testAutoCreationOfDeadLetterResourcesWithNoDLA(SimpleString dla) throws Exception {
server.getAddressSettingsRepository().getMatch("#").setDeadLetterAddress(dla);
int before = server.getActiveMQServerControl().getQueueNames().length;
triggerDlaDelivery();
if (dla != null) {
assertNull(server.getAddressInfo(dla));
}
assertNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
assertEquals(1, server.getActiveMQServerControl().getQueueNames().length - before);
}
@Test
public void testAutoCreateDeadLetterQueuePrefix() throws Exception {
SimpleString prefix = RandomUtil.randomSimpleString();
server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueuePrefix(prefix);
triggerDlaDelivery();
assertNotNull(server.locateQueue(prefix.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX)));
}
@Test
public void testAutoCreateDeadLetterQueueSuffix() throws Exception {
SimpleString suffix = RandomUtil.randomSimpleString();
server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueueSuffix(suffix);
triggerDlaDelivery();
assertNotNull(server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(suffix)));
}
@Test
public void testAutoCreateDeadLetterQueuePrefixAndSuffix() throws Exception {
SimpleString prefix = RandomUtil.randomSimpleString();
SimpleString suffix = RandomUtil.randomSimpleString();
server.getAddressSettingsRepository().getMatch("#").setDeadLetterQueuePrefix(prefix).setDeadLetterQueueSuffix(suffix);
triggerDlaDelivery();
assertNotNull(server.locateQueue(prefix.concat(addressA).concat(suffix)));
}
@Test
public void testAutoCreatedDeadLetterFilterAnycast() throws Exception {
testAutoCreatedDeadLetterFilter(RoutingType.ANYCAST);
}
@Test
public void testAutoCreatedDeadLetterFilterMulticast() throws Exception {
testAutoCreatedDeadLetterFilter(RoutingType.MULTICAST);
}
private void testAutoCreatedDeadLetterFilter(RoutingType routingType) throws Exception {
final int ITERATIONS = 100;
final int MESSAGE_COUNT = 10;
for (int i = 0; i < ITERATIONS; i++) {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
server.createQueue(address, routingType, queue, null, true, false);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession s = addClientSession(cf.createSession(true, false));
ClientProducer p = s.createProducer(address);
for (int j = 0; j < MESSAGE_COUNT; j++) {
p.send(s.createMessage(true).setRoutingType(routingType));
}
p.close();
ClientConsumer consumer = s.createConsumer(queue);
s.start();
for (int j = 0; j < MESSAGE_COUNT; j++) {
ClientMessage message = consumer.receive();
assertNotNull(message);
message.acknowledge();
}
s.rollback();
Queue dlq = server.locateQueue(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(address).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX));
assertNotNull(dlq);
Wait.assertEquals(MESSAGE_COUNT, dlq::getMessageCount);
}
assertEquals(ITERATIONS, server.getPostOffice().getBindingsForAddress(dla).getBindings().size());
}
@Test
public void testAutoDeletionAndRecreationOfDeadLetterResources() throws Exception {
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
triggerDlaDelivery();
// consume the message from the DLQ so it will be auto-deleted
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
ClientConsumer consumer = session.createConsumer(dlqName);
session.start();
ClientMessage message = consumer.receive();
assertNotNull(message);
message.acknowledge();
consumer.close();
session.close();
sessionFactory.close();
locator.close();
Wait.assertTrue(() -> server.locateQueue(dlqName) == null, 2000, 100);
server.destroyQueue(queueA);
triggerDlaDelivery();
assertNotNull(server.getAddressInfo(dla));
assertNotNull(server.locateQueue(dlqName));
}
@Test
public void testWithJMSFQQN() throws Exception {
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
String fqqn = CompositeAddress.toFullyQualified(dla, dlqName).toString();
triggerDlaDelivery();
JMSContext context = new ActiveMQConnectionFactory("vm://0").createContext();
context.start();
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
}
private void triggerDlaDelivery() throws Exception {
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, true, false);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, false));
ClientProducer producer = addClientProducer(session.createProducer(addressA));
producer.send(session.createMessage(true));
producer.close();
ClientConsumer consumer = addClientConsumer(session.createConsumer(queueA));
session.start();
ClientMessage message = consumer.receive();
assertNotNull(message);
message.acknowledge();
session.rollback();
session.close();
sessionFactory.close();
locator.close();
}
}