ARTEMIS-4162 support deleting addresses & queues w/o usage check
There are certain use-cases where addresses will be auto-created and never have a direct binding created on them. Because of this they will never be auto-deleted. If a large number of these addresses build up they will consume a problematic amount of heap space. One specific example of this use-case is an MQTT subscriber with a wild-card subscription and a large number of MQTT producers sending one or two messages a large number of different MQTT topics covered by the wild-card. Since no bindings are ever created on any of these individual addresses (e.g. from a subscription queue) they will never be auto-deleted, but they will eventually consume a large amount of heap. The only way to deal with these addresses is to manually delete them. There are also situations where queues may be created and never have any messages sent to them or never have a consumer connect. These queues will never be auto-deleted so they must be deleted manually. This commit adds the ability to configure the broker to skip the usage check so that these kinds of addresses and queues can be deleted automatically.
This commit is contained in:
parent
6874556f7c
commit
b76c672305
|
@ -287,6 +287,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String AUTO_DELETE_QUEUES_MESSAGE_COUNT = "auto-delete-queues-message-count";
|
||||
|
||||
private static final String AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK = "auto-delete-queues-skip-usage-check";
|
||||
|
||||
private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";
|
||||
|
||||
private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";
|
||||
|
@ -295,6 +297,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String AUTO_DELETE_ADDRESSES_DELAY = "auto-delete-addresses-delay";
|
||||
|
||||
private static final String AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK = "auto-delete-addresses-skip-usage-check";
|
||||
|
||||
private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses";
|
||||
|
||||
private static final String CONFIG_DELETE_DIVERTS = "config-delete-diverts";
|
||||
|
@ -337,8 +341,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";
|
||||
|
||||
private static final String SUPPRESS_SESSION_NOTIFICATIONS = "suppress-session-notifications";
|
||||
|
||||
private boolean validateAIO = false;
|
||||
|
||||
private boolean printPageMaxSizeUsed = false;
|
||||
|
@ -1372,6 +1374,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
long autoDeleteQueuesMessageCount = XMLUtil.parseLong(child);
|
||||
Validators.MINUS_ONE_OR_GE_ZERO.validate(AUTO_DELETE_QUEUES_MESSAGE_COUNT, autoDeleteQueuesMessageCount);
|
||||
addressSettings.setAutoDeleteQueuesMessageCount(autoDeleteQueuesMessageCount);
|
||||
} else if (AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
|
||||
addressSettings.setAutoDeleteQueuesSkipUsageCheck(XMLUtil.parseBoolean(child));
|
||||
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
|
||||
String value = getTrimmedTextContent(child);
|
||||
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value);
|
||||
|
@ -1385,6 +1389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
long autoDeleteAddressesDelay = XMLUtil.parseLong(child);
|
||||
Validators.GE_ZERO.validate(AUTO_DELETE_ADDRESSES_DELAY, autoDeleteAddressesDelay);
|
||||
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
|
||||
} else if (AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
|
||||
addressSettings.setAutoDeleteAddressesSkipUsageCheck(XMLUtil.parseBoolean(child));
|
||||
} else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
|
||||
String value = getTrimmedTextContent(child);
|
||||
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value);
|
||||
|
|
|
@ -83,13 +83,8 @@ public interface AddressManager {
|
|||
|
||||
void scanAddresses(MirrorController mirrorController) throws Exception;
|
||||
|
||||
boolean checkAutoRemoveAddress(SimpleString address,
|
||||
AddressInfo addressInfo,
|
||||
boolean checkAutoRemoveAddress(AddressInfo addressInfo,
|
||||
AddressSettings settings,
|
||||
boolean ignoreDelay) throws Exception;
|
||||
|
||||
boolean checkAutoRemoveAddress(SimpleString address,
|
||||
AddressInfo addressInfo,
|
||||
AddressSettings settings) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -1038,8 +1038,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
@Override
|
||||
public boolean isAddressBound(final SimpleString address) throws Exception {
|
||||
Bindings bindings = lookupBindingsForAddress(address);
|
||||
return bindings != null && !bindings.getBindings().isEmpty();
|
||||
Collection<Binding> bindings = getDirectBindings(address);
|
||||
return bindings != null && !bindings.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1970,15 +1970,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean queueWasUsed(Queue queue) {
|
||||
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
|
||||
private static boolean queueWasUsed(Queue queue, AddressSettings settings) {
|
||||
return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() > 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1 || settings.getAutoDeleteQueuesSkipUsageCheck();
|
||||
}
|
||||
|
||||
/** To be used by the AddressQueueReaper.
|
||||
* It is also exposed for tests through PostOfficeTestAccessor */
|
||||
void reapAddresses(boolean initialCheck) {
|
||||
getLocalQueues().forEach(queue -> {
|
||||
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue))) {
|
||||
AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString());
|
||||
if (!queue.isInternalQueue() && queue.isAutoDelete() && QueueManagerImpl.consumerCountCheck(queue) && (initialCheck || QueueManagerImpl.delayCheck(queue, settings)) && QueueManagerImpl.messageCountCheck(queue) && (initialCheck || queueWasUsed(queue, settings))) {
|
||||
if (initialCheck || queue.isSwept()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (initialCheck) {
|
||||
|
@ -2003,7 +2004,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
||||
|
||||
try {
|
||||
if (addressManager.checkAutoRemoveAddress(address, addressInfo, settings, initialCheck)) {
|
||||
if (addressManager.checkAutoRemoveAddress(addressInfo, settings, initialCheck)) {
|
||||
if (initialCheck || addressInfo.isSwept()) {
|
||||
|
||||
server.autoRemoveAddressInfo(address, null);
|
||||
|
@ -2033,12 +2034,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
}
|
||||
|
||||
public boolean checkAutoRemoveAddress(SimpleString address,
|
||||
AddressInfo addressInfo,
|
||||
AddressSettings settings) throws Exception {
|
||||
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay());
|
||||
}
|
||||
|
||||
private Stream<Queue> getLocalQueues() {
|
||||
return addressManager.getBindings()
|
||||
.filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE)
|
||||
|
|
|
@ -368,17 +368,18 @@ public class SimpleAddressManager implements AddressManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAutoRemoveAddress(SimpleString address,
|
||||
AddressInfo addressInfo,
|
||||
AddressSettings settings) throws Exception {
|
||||
return checkAutoRemoveAddress(address, addressInfo, settings, false);
|
||||
public boolean checkAutoRemoveAddress(AddressInfo addressInfo,
|
||||
AddressSettings settings,
|
||||
boolean ignoreDelay) throws Exception {
|
||||
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(addressInfo.getName()) && addressWasUsed(addressInfo, settings) && (ignoreDelay || delayCheck(addressInfo, settings));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAutoRemoveAddress(SimpleString address,
|
||||
AddressInfo addressInfo,
|
||||
AddressSettings settings, boolean ignoreDelay) throws Exception {
|
||||
return settings.isAutoDeleteAddresses() && addressInfo != null && addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) && (ignoreDelay || addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()));
|
||||
private boolean delayCheck(AddressInfo addressInfo, AddressSettings settings) {
|
||||
return (!settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay()) || (settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() - addressInfo.getCreatedTimestamp() >= settings.getAutoDeleteAddressesDelay());
|
||||
}
|
||||
|
||||
private boolean addressWasUsed(AddressInfo addressInfo, AddressSettings settings) {
|
||||
return addressInfo.getBindingRemovedTimestamp() != -1 || settings.isAutoDeleteAddressesSkipUsageCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1540,10 +1540,10 @@ public interface ActiveMQServerLogger {
|
|||
@LogMessage(id = 224111, value = "Both 'whitelist' and 'allowlist' detected. Configuration 'whitelist' is deprecated, please use only the 'allowlist' configuration", level = LogMessage.Level.WARN)
|
||||
void useOnlyAllowList();
|
||||
|
||||
@LogMessage(id = 224112, value = "Auto removing Queue {} with queueID={} on address={}", level = LogMessage.Level.INFO)
|
||||
@LogMessage(id = 224112, value = "Auto removing queue {} with queueID={} on address={}", level = LogMessage.Level.INFO)
|
||||
void autoRemoveQueue(String name, long queueID, String address);
|
||||
|
||||
@LogMessage(id = 224113, value = "Auto removing Address {}", level = LogMessage.Level.INFO)
|
||||
@LogMessage(id = 224113, value = "Auto removing address {}", level = LogMessage.Level.INFO)
|
||||
void autoRemoveAddress(String name);
|
||||
|
||||
@LogMessage(id = 224114, value = "Address control block, blocking message production on address '{}'. Clients will not get further credit.", level = LogMessage.Level.INFO)
|
||||
|
|
|
@ -542,4 +542,8 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
default QueueConfiguration getQueueConfiguration() {
|
||||
return null;
|
||||
}
|
||||
|
||||
default long getCreatedTimestamp() {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,13 +73,14 @@ public class AddressInfo {
|
|||
|
||||
private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");
|
||||
|
||||
private long bindingRemovedTimestamp = -1;
|
||||
private volatile long bindingRemovedTimestamp = -1;
|
||||
|
||||
private volatile boolean paused = false;
|
||||
|
||||
private PostOffice postOffice;
|
||||
private StorageManager storageManager;
|
||||
private HierarchicalRepositoryChangeListener repositoryChangeListener;
|
||||
private long createdTimestamp = -1;
|
||||
|
||||
public boolean isSwept() {
|
||||
return swept;
|
||||
|
@ -96,11 +97,11 @@ public class AddressInfo {
|
|||
}
|
||||
|
||||
public AddressInfo(String name) {
|
||||
this(SimpleString.toSimpleString(name), EnumSet.noneOf(RoutingType.class));
|
||||
this(SimpleString.toSimpleString(name), EMPTY_ROUTING_TYPES);
|
||||
}
|
||||
|
||||
public AddressInfo(SimpleString name) {
|
||||
this(name, EnumSet.noneOf(RoutingType.class));
|
||||
this(name, EMPTY_ROUTING_TYPES);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -110,6 +111,7 @@ public class AddressInfo {
|
|||
*/
|
||||
public AddressInfo(SimpleString name, EnumSet<RoutingType> routingTypes) {
|
||||
this.name = CompositeAddress.extractAddressName(name);
|
||||
this.createdTimestamp = System.currentTimeMillis();
|
||||
setRoutingTypes(routingTypes);
|
||||
}
|
||||
|
||||
|
@ -120,6 +122,7 @@ public class AddressInfo {
|
|||
*/
|
||||
public AddressInfo(SimpleString name, RoutingType routingType) {
|
||||
this.name = CompositeAddress.extractAddressName(name);
|
||||
this.createdTimestamp = System.currentTimeMillis();
|
||||
addRoutingType(routingType);
|
||||
}
|
||||
|
||||
|
@ -313,6 +316,9 @@ public class AddressInfo {
|
|||
buff.append("}");
|
||||
buff.append(", autoCreated=").append(autoCreated);
|
||||
buff.append(", paused=").append(paused);
|
||||
buff.append(", bindingRemovedTimestamp=").append(bindingRemovedTimestamp);
|
||||
buff.append(", swept=").append(swept);
|
||||
buff.append(", createdTimestamp=").append(createdTimestamp);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
@ -388,6 +394,7 @@ public class AddressInfo {
|
|||
}
|
||||
builder.add("routingTypes", arrayBuilder);
|
||||
}
|
||||
builder.add("created-timestamp", createdTimestamp);
|
||||
|
||||
return builder.build().toString();
|
||||
}
|
||||
|
@ -412,6 +419,9 @@ public class AddressInfo {
|
|||
JsonNumber jsonNumber = (JsonNumber)rtValue;
|
||||
this.addRoutingType(RoutingType.getType((byte)jsonNumber.intValue()));
|
||||
}
|
||||
} else if (key.equals("created-timestamp")) {
|
||||
JsonNumber jsonLong = (JsonNumber) value;
|
||||
this.createdTimestamp = jsonLong.longValue();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -427,4 +437,7 @@ public class AddressInfo {
|
|||
return result;
|
||||
}
|
||||
|
||||
public long getCreatedTimestamp() {
|
||||
return createdTimestamp;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -347,6 +347,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private volatile long ringSize;
|
||||
|
||||
private volatile long createdTimestamp = -1;
|
||||
|
||||
@Override
|
||||
public boolean isSwept() {
|
||||
return swept;
|
||||
|
@ -635,6 +637,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final QueueFactory factory) {
|
||||
super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
|
||||
|
||||
this.createdTimestamp = System.currentTimeMillis();
|
||||
|
||||
this.id = queueConfiguration.getId();
|
||||
|
||||
this.address = queueConfiguration.getAddress();
|
||||
|
@ -1603,6 +1607,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
this.ringSize = ringSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreatedTimestamp() {
|
||||
return createdTimestamp;
|
||||
}
|
||||
|
||||
public long getMessageCountForRing() {
|
||||
return (long) pendingMetrics.getMessageCount();
|
||||
}
|
||||
|
|
|
@ -65,7 +65,6 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
|||
|
||||
public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) {
|
||||
SimpleString queueName = queue.getName();
|
||||
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("deleting auto-created queue \"{}\": consumerCount = {}; messageCount = {}; isAutoDelete = {}", queueName, queue.getConsumerCount(), queue.getMessageCount(), queue.isAutoDelete());
|
||||
|
@ -80,16 +79,12 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
|||
}
|
||||
}
|
||||
|
||||
public static boolean isAutoDelete(Queue queue) {
|
||||
return queue.isAutoDelete();
|
||||
}
|
||||
|
||||
public static boolean messageCountCheck(Queue queue) {
|
||||
return queue.getAutoDeleteMessageCount() == -1 || queue.getMessageCount() <= queue.getAutoDeleteMessageCount();
|
||||
}
|
||||
|
||||
public static boolean delayCheck(Queue queue) {
|
||||
return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay();
|
||||
public static boolean delayCheck(Queue queue, AddressSettings settings) {
|
||||
return (!settings.getAutoDeleteQueuesSkipUsageCheck() && System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay()) || (settings.getAutoDeleteQueuesSkipUsageCheck() && System.currentTimeMillis() - queue.getCreatedTimestamp() >= queue.getAutoDeleteDelay());
|
||||
}
|
||||
|
||||
public static boolean consumerCountCheck(Queue queue) {
|
||||
|
|
|
@ -82,6 +82,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final long DEFAULT_AUTO_DELETE_QUEUES_DELAY = 0;
|
||||
|
||||
public static final boolean DEFAULT_AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK = false;
|
||||
|
||||
public static final long DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT = 0;
|
||||
|
||||
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF;
|
||||
|
@ -92,6 +94,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final long DEFAULT_AUTO_DELETE_ADDRESSES_DELAY = 0;
|
||||
|
||||
public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK = false;
|
||||
|
||||
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF;
|
||||
|
||||
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_DIVERTS = DeletionPolicy.OFF;
|
||||
|
@ -231,6 +235,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Long autoDeleteQueuesDelay = null;
|
||||
|
||||
private Boolean autoDeleteQueuesSkipUsageCheck = null;
|
||||
|
||||
private Long autoDeleteQueuesMessageCount = null;
|
||||
|
||||
private Long defaultRingSize = null;
|
||||
|
@ -245,6 +251,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Long autoDeleteAddressesDelay = null;
|
||||
|
||||
private Boolean autoDeleteAddressesSkipUsageCheck = null;
|
||||
|
||||
private DeletionPolicy configDeleteAddresses = null;
|
||||
|
||||
private DeletionPolicy configDeleteDiverts = null;
|
||||
|
@ -335,10 +343,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.autoDeleteQueues = other.autoDeleteQueues;
|
||||
this.autoDeleteCreatedQueues = other.autoDeleteCreatedQueues;
|
||||
this.autoDeleteQueuesDelay = other.autoDeleteQueuesDelay;
|
||||
this.autoDeleteQueuesSkipUsageCheck = other.autoDeleteQueuesSkipUsageCheck;
|
||||
this.configDeleteQueues = other.configDeleteQueues;
|
||||
this.autoCreateAddresses = other.autoCreateAddresses;
|
||||
this.autoDeleteAddresses = other.autoDeleteAddresses;
|
||||
this.autoDeleteAddressesDelay = other.autoDeleteAddressesDelay;
|
||||
this.autoDeleteAddressesSkipUsageCheck = other.autoDeleteAddressesSkipUsageCheck;
|
||||
this.configDeleteAddresses = other.configDeleteAddresses;
|
||||
this.configDeleteDiverts = other.configDeleteDiverts;
|
||||
this.managementBrowsePageSize = other.managementBrowsePageSize;
|
||||
|
@ -446,6 +456,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean getAutoDeleteQueuesSkipUsageCheck() {
|
||||
return autoDeleteQueuesSkipUsageCheck != null ? autoDeleteQueuesSkipUsageCheck : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK;
|
||||
}
|
||||
|
||||
public AddressSettings setAutoDeleteQueuesSkipUsageCheck(final boolean autoDeleteQueuesSkipUsageCheck) {
|
||||
this.autoDeleteQueuesSkipUsageCheck = autoDeleteQueuesSkipUsageCheck;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getAutoDeleteQueuesMessageCount() {
|
||||
return autoDeleteQueuesMessageCount != null ? autoDeleteQueuesMessageCount : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT;
|
||||
}
|
||||
|
@ -491,6 +510,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isAutoDeleteAddressesSkipUsageCheck() {
|
||||
return autoDeleteAddressesSkipUsageCheck != null ? autoDeleteAddressesSkipUsageCheck : AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK;
|
||||
}
|
||||
|
||||
public AddressSettings setAutoDeleteAddressesSkipUsageCheck(final boolean autoDeleteAddressesSkipUsageCheck) {
|
||||
this.autoDeleteAddressesSkipUsageCheck = autoDeleteAddressesSkipUsageCheck;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeletionPolicy getConfigDeleteAddresses() {
|
||||
return configDeleteAddresses != null ? configDeleteAddresses : AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES;
|
||||
}
|
||||
|
@ -1151,6 +1179,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (autoDeleteQueuesDelay == null) {
|
||||
autoDeleteQueuesDelay = merged.autoDeleteQueuesDelay;
|
||||
}
|
||||
if (autoDeleteQueuesSkipUsageCheck == null) {
|
||||
autoDeleteQueuesSkipUsageCheck = merged.autoDeleteQueuesSkipUsageCheck;
|
||||
}
|
||||
if (autoDeleteQueuesMessageCount == null) {
|
||||
autoDeleteQueuesMessageCount = merged.autoDeleteQueuesMessageCount;
|
||||
}
|
||||
|
@ -1166,6 +1197,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (autoDeleteAddressesDelay == null) {
|
||||
autoDeleteAddressesDelay = merged.autoDeleteAddressesDelay;
|
||||
}
|
||||
if (autoDeleteAddressesSkipUsageCheck == null) {
|
||||
autoDeleteAddressesSkipUsageCheck = merged.autoDeleteAddressesSkipUsageCheck;
|
||||
}
|
||||
if (configDeleteAddresses == null) {
|
||||
configDeleteAddresses = merged.configDeleteAddresses;
|
||||
}
|
||||
|
@ -1535,6 +1569,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
pageFullMessagePolicy = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
autoDeleteQueuesSkipUsageCheck = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
autoDeleteAddressesSkipUsageCheck = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1584,7 +1626,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
SimpleString.sizeofNullableString(defaultLastValueKey) +
|
||||
BufferHelper.sizeOfNullableBoolean(defaultNonDestructive) +
|
||||
BufferHelper.sizeOfNullableLong(autoDeleteQueuesDelay) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteQueuesSkipUsageCheck) +
|
||||
BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteAddressesSkipUsageCheck) +
|
||||
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
|
||||
BufferHelper.sizeOfNullableInteger(defaultGroupBuckets) +
|
||||
SimpleString.sizeofNullableString(defaultGroupFirstKey) +
|
||||
|
@ -1755,6 +1799,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new SimpleString(pageFullMessagePolicy.toString()) : null);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteQueuesSkipUsageCheck);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteAddressesSkipUsageCheck);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -1797,11 +1844,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteCreatedQueues == null) ? 0 : autoDeleteCreatedQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteQueuesDelay == null) ? 0 : autoDeleteQueuesDelay.hashCode());
|
||||
result = prime * result + ((autoDeleteQueuesSkipUsageCheck == null) ? 0 : autoDeleteQueuesSkipUsageCheck.hashCode());
|
||||
result = prime * result + ((autoDeleteQueuesMessageCount == null) ? 0 : autoDeleteQueuesMessageCount.hashCode());
|
||||
result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode());
|
||||
result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode());
|
||||
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
|
||||
result = prime * result + ((autoDeleteAddressesDelay == null) ? 0 : autoDeleteAddressesDelay.hashCode());
|
||||
result = prime * result + ((autoDeleteAddressesSkipUsageCheck == null) ? 0 : autoDeleteAddressesSkipUsageCheck.hashCode());
|
||||
result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode());
|
||||
result = prime * result + ((configDeleteDiverts == null) ? 0 : configDeleteDiverts.hashCode());
|
||||
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
|
||||
|
@ -2015,6 +2064,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
} else if (!autoDeleteQueuesDelay.equals(other.autoDeleteQueuesDelay))
|
||||
return false;
|
||||
if (autoDeleteQueuesSkipUsageCheck == null) {
|
||||
if (other.autoDeleteQueuesSkipUsageCheck != null)
|
||||
return false;
|
||||
} else if (!autoDeleteQueuesSkipUsageCheck.equals(other.autoDeleteQueuesSkipUsageCheck))
|
||||
return false;
|
||||
if (autoDeleteQueuesMessageCount == null) {
|
||||
if (other.autoDeleteQueuesMessageCount != null)
|
||||
return false;
|
||||
|
@ -2040,6 +2094,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
} else if (!autoDeleteAddressesDelay.equals(other.autoDeleteAddressesDelay))
|
||||
return false;
|
||||
if (autoDeleteAddressesSkipUsageCheck == null) {
|
||||
if (other.autoDeleteAddressesSkipUsageCheck != null)
|
||||
return false;
|
||||
} else if (!autoDeleteAddressesSkipUsageCheck.equals(other.autoDeleteAddressesSkipUsageCheck))
|
||||
return false;
|
||||
if (configDeleteAddresses == null) {
|
||||
if (other.configDeleteAddresses != null)
|
||||
return false;
|
||||
|
@ -2309,6 +2368,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoDeleteCreatedQueues +
|
||||
", autoDeleteQueuesDelay=" +
|
||||
autoDeleteQueuesDelay +
|
||||
", autoDeleteQueuesSkipUsageCheck=" +
|
||||
autoDeleteQueuesSkipUsageCheck +
|
||||
", autoDeleteQueuesMessageCount=" +
|
||||
autoDeleteQueuesMessageCount +
|
||||
", configDeleteQueues=" +
|
||||
|
@ -2319,6 +2380,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoDeleteAddresses +
|
||||
", autoDeleteAddressesDelay=" +
|
||||
autoDeleteAddressesDelay +
|
||||
", autoDeleteAddressesSkipUsageCheck=" +
|
||||
autoDeleteAddressesSkipUsageCheck +
|
||||
", configDeleteAddresses=" +
|
||||
configDeleteAddresses +
|
||||
", configDeleteDiverts=" +
|
||||
|
|
|
@ -4090,6 +4090,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="auto-delete-queues-skip-usage-check" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
whether or not to check that the queue has actually be used before auto-deleting it
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="config-delete-queues" default="OFF" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
@ -4132,6 +4140,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="auto-delete-addresses-skip-usage-check" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
whether or not to check that the address has actually be used before auto-deleting it
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="config-delete-addresses" default="OFF" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -450,7 +450,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(true, conf.getAddressSettings().get("a1").isAutoCreateJmsTopics());
|
||||
assertEquals(true, conf.getAddressSettings().get("a1").isAutoDeleteJmsTopics());
|
||||
assertEquals(0, conf.getAddressSettings().get("a1").getAutoDeleteQueuesDelay());
|
||||
assertEquals(false, conf.getAddressSettings().get("a1").getAutoDeleteQueuesSkipUsageCheck());
|
||||
assertEquals(0, conf.getAddressSettings().get("a1").getAutoDeleteAddressesDelay());
|
||||
assertEquals(false, conf.getAddressSettings().get("a1").isAutoDeleteAddressesSkipUsageCheck());
|
||||
assertEquals(false, conf.getAddressSettings().get("a1").isDefaultPurgeOnNoConsumers());
|
||||
assertEquals(Integer.valueOf(5), conf.getAddressSettings().get("a1").getDefaultMaxConsumers());
|
||||
assertEquals(RoutingType.ANYCAST, conf.getAddressSettings().get("a1").getDefaultQueueRoutingType());
|
||||
|
@ -486,7 +488,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(false, conf.getAddressSettings().get("a2").isAutoCreateJmsTopics());
|
||||
assertEquals(false, conf.getAddressSettings().get("a2").isAutoDeleteJmsTopics());
|
||||
assertEquals(500, conf.getAddressSettings().get("a2").getAutoDeleteQueuesDelay());
|
||||
assertEquals(true, conf.getAddressSettings().get("a2").getAutoDeleteQueuesSkipUsageCheck());
|
||||
assertEquals(1000, conf.getAddressSettings().get("a2").getAutoDeleteAddressesDelay());
|
||||
assertEquals(true, conf.getAddressSettings().get("a2").isAutoDeleteAddressesSkipUsageCheck());
|
||||
assertEquals(true, conf.getAddressSettings().get("a2").isDefaultPurgeOnNoConsumers());
|
||||
assertEquals(Integer.valueOf(15), conf.getAddressSettings().get("a2").getDefaultMaxConsumers());
|
||||
assertEquals(RoutingType.MULTICAST, conf.getAddressSettings().get("a2").getDefaultQueueRoutingType());
|
||||
|
|
|
@ -22,4 +22,9 @@ public class PostOfficeTestAccessor {
|
|||
postOffice.reapAddresses(false);
|
||||
}
|
||||
|
||||
public static void sweepAndReapAddresses(PostOfficeImpl postOffice) {
|
||||
reapAddresses(postOffice);
|
||||
reapAddresses(postOffice);
|
||||
reapAddresses(postOffice);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -554,9 +554,11 @@
|
|||
<auto-create-queues>false</auto-create-queues>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-queues-delay>500</auto-delete-queues-delay>
|
||||
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
|
||||
<auto-create-addresses>false</auto-create-addresses>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
|
||||
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
|
||||
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
|
||||
<default-max-consumers>15</default-max-consumers>
|
||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||
|
|
|
@ -71,9 +71,11 @@
|
|||
<auto-create-queues>false</auto-create-queues>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-queues-delay>500</auto-delete-queues-delay>
|
||||
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
|
||||
<auto-create-addresses>false</auto-create-addresses>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
|
||||
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
|
||||
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
|
||||
<default-max-consumers>15</default-max-consumers>
|
||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||
|
|
|
@ -71,9 +71,11 @@
|
|||
<auto-create-queues>false</auto-create-queues>
|
||||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-delete-queues-delay>500</auto-delete-queues-delay>
|
||||
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
|
||||
<auto-create-addresses>false</auto-create-addresses>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
|
||||
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
|
||||
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
|
||||
<default-max-consumers>15</default-max-consumers>
|
||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||
|
|
|
@ -111,7 +111,7 @@ public class Wait {
|
|||
|
||||
|
||||
public static void assertEquals(Object obj, ObjectCondition condition, long timeout, long sleepMillis) throws Exception {
|
||||
boolean result = waitFor(() -> (obj == condition || obj.equals(condition.getObject())), timeout, sleepMillis);
|
||||
boolean result = waitFor(() -> (obj == condition || (obj != null && obj.equals(condition.getObject()))), timeout, sleepMillis);
|
||||
|
||||
if (!result) {
|
||||
Assert.assertEquals(obj, condition.getObject());
|
||||
|
|
|
@ -44,11 +44,13 @@ that would be found in the `broker.xml` file.
|
|||
<auto-delete-created-queues>false</auto-delete-created-queues>
|
||||
<auto-delete-queues-delay>0</auto-delete-queues-delay>
|
||||
<auto-delete-queues-message-count>0</auto-delete-queues-message-count>
|
||||
<auto-delete-queues-skip-usage-check>false</auto-delete-queues-skip-usage-check>
|
||||
<config-delete-queues>OFF</config-delete-queues>
|
||||
<config-delete-diverts>OFF</config-delete-diverts>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-addresses>true</auto-delete-addresses>
|
||||
<auto-delete-addresses-delay>0</auto-delete-addresses-delay>
|
||||
<auto-delete-addresses-skip-usage-check>false</auto-delete-addresses-skip-usage-check>
|
||||
<config-delete-addresses>OFF</config-delete-addresses>
|
||||
<management-browse-page-size>200</management-browse-page-size>
|
||||
<management-message-attribute-size-limit>256</management-message-attribute-size-limit>
|
||||
|
@ -264,6 +266,15 @@ less than or equal to before deleting auto-created queues.
|
|||
To disable message count check `-1` can be set.
|
||||
Default is `0` (empty queue).
|
||||
|
||||
`auto-delete-queues-skip-usage-check`. A queue will only be auto-deleted by
|
||||
default if it has actually been "used." A queue is considered "used" if any
|
||||
messages have been sent to it or any consumers have connected to it during its
|
||||
life. However, there are use-cases where it's useful to skip this check. When
|
||||
set to `true` it is **imperative** to also set `auto-delete-queues-delay` to a
|
||||
value greater than `0` otherwise queues may be deleted almost immediately after
|
||||
being created. In this case the queue will be deleted based on when it was
|
||||
created rather then when it was last "used." Default is `false`.
|
||||
|
||||
**Note:** the above auto-delete address settings can also be configured
|
||||
individually at the queue level when a client auto creates the queue.
|
||||
|
||||
|
@ -296,6 +307,15 @@ is `0` (delete immediately). The broker's `address-queue-scan-period` controls
|
|||
how often (in milliseconds) addresses are scanned for potential deletion. Use
|
||||
`-1` to disable scanning. The default scan value is `30000`.
|
||||
|
||||
`auto-delete-addresses-skip-usage-check`. An address will only be auto-deleted by
|
||||
default if it has actually been "used." An address is considered "used" if any
|
||||
queues have been created on it during its life. However, there are use-cases
|
||||
where it's useful to skip this check. When set to `true` it is **imperative** to
|
||||
also set `auto-delete-addresses-delay` to a value greater than `0` otherwise
|
||||
addresses may be deleted almost immediately after being created. In this case
|
||||
the address will be deleted based on when it was created rather then when it was
|
||||
last "used." Default is `false`.
|
||||
|
||||
`config-delete-addresses`. How the broker should handle addresses deleted on
|
||||
config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more
|
||||
about [configuration reload](config-reload.md).
|
||||
|
|
|
@ -16,14 +16,27 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -43,7 +56,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
|
|||
super.setUp();
|
||||
locator = createInVMNonHALocator();
|
||||
server = createServer(false);
|
||||
server.getConfiguration().setAddressQueueScanPeriod(10);
|
||||
server.getConfiguration().setAddressQueueScanPeriod(0);
|
||||
|
||||
server.start();
|
||||
cf = createSessionFactory(locator);
|
||||
|
@ -55,6 +68,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
|
|||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
|
||||
assertNotNull(server.getAddressInfo(addressA));
|
||||
cf.createSession().createConsumer(queueA).close();
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
Wait.assertTrue(() -> server.getAddressInfo(addressA) == null);
|
||||
}
|
||||
|
||||
|
@ -64,6 +78,68 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
|
|||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
|
||||
assertNotNull(server.getAddressInfo(addressA));
|
||||
cf.createSession().createConsumer(queueA).close();
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
assertNotNull(server.getAddressInfo(addressA));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteAutoCreatedAddressSkipUsageCheckWithDelay() throws Exception {
|
||||
final long DELAY = 1500;
|
||||
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true).setAutoDeleteAddressesDelay(DELAY));
|
||||
server.addAddressInfo(new AddressInfo(addressA).setAutoCreated(true));
|
||||
long start = System.currentTimeMillis();
|
||||
assertNotNull(server.getAddressInfo(addressA));
|
||||
while (System.currentTimeMillis() - start <= DELAY) {
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
assertNotNull(server.getAddressInfo(addressA));
|
||||
Thread.sleep(100);
|
||||
}
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
assertNull(server.getAddressInfo(addressA));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteAddressWithWildcardAddress() throws Exception {
|
||||
String prefix = "topic";
|
||||
server.getAddressSettingsRepository().addMatch(prefix + ".#", new AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true));
|
||||
String wildcardAddress = prefix + ".#";
|
||||
String queue = RandomUtil.randomString();
|
||||
final int MESSAGE_COUNT = 10;
|
||||
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
|
||||
|
||||
server.createQueue(new QueueConfiguration(queue).setAddress(wildcardAddress).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
|
||||
|
||||
ClientSession consumerSession = cf.createSession();
|
||||
ClientConsumer consumer = consumerSession.createConsumer(queue);
|
||||
consumer.setMessageHandler(message -> latch.countDown());
|
||||
consumerSession.start();
|
||||
|
||||
ClientSession producerSession = cf.createSession();
|
||||
ClientProducer producer = producerSession.createProducer();
|
||||
|
||||
List<String> addresses = new ArrayList<>();
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
String address = prefix + "." + RandomUtil.randomString();
|
||||
addresses.add(address);
|
||||
server.addAddressInfo(new AddressInfo(address).setAutoCreated(true));
|
||||
producer.send(address, producerSession.createMessage(false));
|
||||
}
|
||||
producerSession.close();
|
||||
|
||||
assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
for (String address : addresses) {
|
||||
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
|
||||
Wait.assertEquals(true, () -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.toSimpleString(address)), 2000, 100);
|
||||
}
|
||||
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
|
||||
for (String address : addresses) {
|
||||
assertNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
|
||||
Wait.assertEquals(false, () -> Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.toSimpleString(address)), 2000, 100);
|
||||
}
|
||||
|
||||
consumerSession.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,13 +17,15 @@
|
|||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
|
@ -101,4 +103,29 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
|
|||
assertNotNull(server.locateQueue(queueA));
|
||||
assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000, 100));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteAutoCreatedQueueWithoutUsage() throws Exception {
|
||||
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueuesSkipUsageCheck(true));
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
|
||||
assertNotNull(server.locateQueue(queueA));
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteAutoCreatedQueueWithoutUsageWithDelay() throws Exception {
|
||||
final long DELAY = 1500;
|
||||
server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueuesSkipUsageCheck(true).setAutoDeleteQueuesDelay(DELAY));
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
|
||||
long start = System.currentTimeMillis();
|
||||
assertNotNull(server.locateQueue(queueA));
|
||||
while (System.currentTimeMillis() - start <= DELAY) {
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
assertNotNull(server.locateQueue(queueA));
|
||||
Thread.sleep(100);
|
||||
}
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
assertNull(server.locateQueue(queueA));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
|
@ -447,4 +449,45 @@ public class MQTT5Test extends MQTT5TestSupport {
|
|||
// verify the shared subscription queue is removed after all the subscribers disconnect
|
||||
Wait.assertTrue(() -> server.locateQueue(SUB_NAME.concat(".").concat(TOPIC)) == null, 2000, 100);
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testAutoDeleteAddressWithWildcardSubscription() throws Exception {
|
||||
String prefix = "topic";
|
||||
server.getAddressSettingsRepository().addMatch(prefix + ".#", new AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true));
|
||||
String topic = prefix + "/#";
|
||||
final int MESSAGE_COUNT = 100;
|
||||
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
|
||||
|
||||
MqttClient consumer = createPahoClient("consumer");
|
||||
consumer.connect();
|
||||
consumer.subscribe(topic, AT_LEAST_ONCE);
|
||||
consumer.setCallback(new LatchedMqttCallback(latch));
|
||||
|
||||
MqttClient producer = createPahoClient("producer");
|
||||
producer.connect();
|
||||
|
||||
List<String> addresses = new ArrayList<>();
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
String address = prefix + "/" + RandomUtil.randomString();
|
||||
addresses.add(address.replace('/', '.'));
|
||||
producer.publish(address, new MqttMessage());
|
||||
}
|
||||
producer.disconnect();
|
||||
producer.close();
|
||||
|
||||
assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
for (String address : addresses) {
|
||||
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
|
||||
}
|
||||
|
||||
PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) server.getPostOffice());
|
||||
|
||||
for (String address : addresses) {
|
||||
assertNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
|
||||
}
|
||||
|
||||
consumer.disconnect();
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue