This closes #2602
This commit is contained in:
commit
71ec3c6d3d
|
@ -222,6 +222,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String AUTO_DELETE_QUEUES = "auto-delete-queues";
|
||||
|
||||
private static final String AUTO_DELETE_CREATED_QUEUES = "auto-delete-created-queues";
|
||||
|
||||
private static final String AUTO_DELETE_QUEUES_DELAY = "auto-delete-queues-delay";
|
||||
|
||||
private static final String AUTO_DELETE_QUEUES_MESSAGE_COUNT = "auto-delete-queues-message-count";
|
||||
|
@ -1074,6 +1076,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setAutoCreateQueues(XMLUtil.parseBoolean(child));
|
||||
} else if (AUTO_DELETE_QUEUES.equalsIgnoreCase(name)) {
|
||||
addressSettings.setAutoDeleteQueues(XMLUtil.parseBoolean(child));
|
||||
} else if (AUTO_DELETE_CREATED_QUEUES.equalsIgnoreCase(name)) {
|
||||
addressSettings.setAutoDeleteCreatedQueues(XMLUtil.parseBoolean(child));
|
||||
} else if (AUTO_DELETE_QUEUES_DELAY.equalsIgnoreCase(name)) {
|
||||
long autoDeleteQueuesDelay = XMLUtil.parseLong(child);
|
||||
Validators.GE_ZERO.validate(AUTO_DELETE_QUEUES_DELAY, autoDeleteQueuesDelay);
|
||||
|
|
|
@ -955,7 +955,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
addressSettings.isDefaultNonDestructive(),
|
||||
addressSettings.getDefaultConsumersBeforeDispatch(),
|
||||
addressSettings.getDefaultDelayBeforeDispatch(),
|
||||
addressSettings.isAutoDeleteQueues(),
|
||||
addressSettings.isAutoDeleteCreatedQueues(),
|
||||
addressSettings.getAutoDeleteQueuesDelay(),
|
||||
addressSettings.getAutoDeleteQueuesMessageCount(),
|
||||
autoCreateAddress
|
||||
|
|
|
@ -1582,7 +1582,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
public void run() {
|
||||
for (Queue queue : getLocalQueues()) {
|
||||
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)) {
|
||||
QueueManagerImpl.deleteAutoCreatedQueue(server, queue);
|
||||
QueueManagerImpl.performAutoDeleteQueue(server, queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -916,7 +916,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
|
||||
boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
|
||||
int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
|
||||
boolean autoDeleteQueues = addressSettings.isAutoDeleteQueues();
|
||||
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
|
||||
long autoDeleteQueuesMessageCount = addressSettings.getAutoDeleteQueuesMessageCount();
|
||||
|
||||
|
@ -934,7 +933,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
// make an exception for the management address (see HORNETQ-29)
|
||||
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, null, null, null, null, null, defaultConsumerWindowSize);
|
||||
} else {
|
||||
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, autoDeleteQueues, autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize);
|
||||
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize);
|
||||
}
|
||||
|
||||
return response;
|
||||
|
@ -1761,13 +1760,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
@Override
|
||||
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
|
||||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1775,13 +1774,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
|
||||
}
|
||||
|
||||
private static boolean isAutoDelete(boolean autoCreated, AddressSettings addressSettings) {
|
||||
return autoCreated ? addressSettings.isAutoDeleteQueues() : addressSettings.isAutoDeleteCreatedQueues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
|
||||
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
|
||||
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
|
||||
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
|
||||
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1789,7 +1791,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
|
||||
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
|
||||
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
|
||||
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1826,7 +1828,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
boolean exclusive,
|
||||
boolean lastValue) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString());
|
||||
createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount());
|
||||
createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(false, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2908,7 +2910,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
} else {
|
||||
// if the address::queue doesn't exist then create it
|
||||
try {
|
||||
createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), true, true);
|
||||
createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, isAutoDelete(false, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), true, true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// the queue may exist on a *different* address
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
|
||||
|
|
|
@ -41,7 +41,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
|||
}
|
||||
|
||||
if (isAutoDelete(queue) && consumerCountCheck(queue) && delayCheck(queue) && messageCountCheck(queue)) {
|
||||
deleteAutoCreatedQueue(server, queue);
|
||||
performAutoDeleteQueue(server, queue);
|
||||
} else if (queue.isPurgeOnNoConsumers()) {
|
||||
purge(queue);
|
||||
}
|
||||
|
@ -61,11 +61,11 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
|||
}
|
||||
}
|
||||
|
||||
public static void deleteAutoCreatedQueue(ActiveMQServer server, Queue queue) {
|
||||
public static void performAutoDeleteQueue(ActiveMQServer server, Queue queue) {
|
||||
SimpleString queueName = queue.getName();
|
||||
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
|
||||
ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDelete = " + queue.isAutoDelete());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -76,7 +76,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
|
|||
}
|
||||
|
||||
public static boolean isAutoDelete(Queue queue) {
|
||||
return queue.isAutoCreated() && queue.isAutoDelete();
|
||||
return queue.isAutoDelete();
|
||||
}
|
||||
|
||||
public static boolean messageCountCheck(Queue queue) {
|
||||
|
|
|
@ -875,7 +875,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
nonDestructive == null ? as.isDefaultNonDestructive() : nonDestructive,
|
||||
consumersBeforeDispatch == null ? as.getDefaultConsumersBeforeDispatch() : consumersBeforeDispatch,
|
||||
delayBeforeDispatch == null ? as.getDefaultDelayBeforeDispatch() : delayBeforeDispatch,
|
||||
autoDelete == null ? as.isAutoDeleteQueues() : autoDelete,
|
||||
autoDelete == null ? as.isAutoDeleteCreatedQueues() : autoDelete,
|
||||
autoDeleteDelay == null ? as.getAutoDeleteQueuesDelay() : delayBeforeDispatch,
|
||||
autoDeleteMessageCount == null ? as.getAutoDeleteQueuesMessageCount() : autoDeleteMessageCount);
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
|
||||
|
||||
public static final boolean DEFAULT_AUTO_DELETE_CREATED_QUEUES = false;
|
||||
|
||||
public static final long DEFAULT_AUTO_DELETE_QUEUES_DELAY = 0;
|
||||
|
||||
public static final long DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT = 0;
|
||||
|
@ -169,6 +171,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Boolean autoDeleteQueues = null;
|
||||
|
||||
private Boolean autoDeleteCreatedQueues = null;
|
||||
|
||||
private Long autoDeleteQueuesDelay = null;
|
||||
|
||||
private Long autoDeleteQueuesMessageCount = null;
|
||||
|
@ -234,6 +238,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
|
||||
this.autoCreateQueues = other.autoCreateQueues;
|
||||
this.autoDeleteQueues = other.autoDeleteQueues;
|
||||
this.autoDeleteCreatedQueues = other.autoDeleteCreatedQueues;
|
||||
this.autoDeleteQueuesDelay = other.autoDeleteQueuesDelay;
|
||||
this.configDeleteQueues = other.configDeleteQueues;
|
||||
this.autoCreateAddresses = other.autoCreateAddresses;
|
||||
|
@ -319,6 +324,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public AddressSettings setAutoDeleteCreatedQueues(Boolean autoDeleteCreatedQueues) {
|
||||
this.autoDeleteCreatedQueues = autoDeleteCreatedQueues;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isAutoDeleteCreatedQueues() {
|
||||
return autoDeleteCreatedQueues != null ? autoDeleteCreatedQueues : AddressSettings.DEFAULT_AUTO_DELETE_CREATED_QUEUES;
|
||||
}
|
||||
|
||||
|
||||
public long getAutoDeleteQueuesDelay() {
|
||||
return autoDeleteQueuesDelay != null ? autoDeleteQueuesDelay : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_DELAY;
|
||||
}
|
||||
|
@ -779,6 +794,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (autoDeleteQueues == null) {
|
||||
autoDeleteQueues = merged.autoDeleteQueues;
|
||||
}
|
||||
if (autoDeleteCreatedQueues == null) {
|
||||
autoDeleteCreatedQueues = merged.autoDeleteCreatedQueues;
|
||||
}
|
||||
if (autoDeleteQueuesDelay == null) {
|
||||
autoDeleteQueuesDelay = merged.autoDeleteQueuesDelay;
|
||||
}
|
||||
|
@ -999,6 +1017,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (buffer.readableBytes() > 0) {
|
||||
autoDeleteQueuesMessageCount = BufferHelper.readNullableLong(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
autoDeleteCreatedQueues = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1047,7 +1069,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay) +
|
||||
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
|
||||
BufferHelper.sizeOfNullableInteger(defaultGroupBuckets) +
|
||||
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount);
|
||||
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteQueues);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1144,6 +1167,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
BufferHelper.writeNullableLong(buffer, autoDeleteQueuesMessageCount);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteCreatedQueues);
|
||||
|
||||
}
|
||||
|
||||
|
@ -1182,6 +1206,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
|
||||
result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteCreatedQueues == null) ? 0 : autoDeleteCreatedQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteQueuesDelay == null) ? 0 : autoDeleteQueuesDelay.hashCode());
|
||||
result = prime * result + ((autoDeleteQueuesMessageCount == null) ? 0 : autoDeleteQueuesMessageCount.hashCode());
|
||||
result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode());
|
||||
|
@ -1356,6 +1381,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
} else if (!autoDeleteQueues.equals(other.autoDeleteQueues))
|
||||
return false;
|
||||
if (autoDeleteCreatedQueues == null) {
|
||||
if (other.autoDeleteCreatedQueues != null)
|
||||
return false;
|
||||
} else if (!autoDeleteCreatedQueues.equals(other.autoDeleteCreatedQueues))
|
||||
return false;
|
||||
if (autoDeleteQueuesDelay == null) {
|
||||
if (other.autoDeleteQueuesDelay != null)
|
||||
return false;
|
||||
|
@ -1526,6 +1556,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoCreateQueues +
|
||||
", autoDeleteQueues=" +
|
||||
autoDeleteQueues +
|
||||
", autoDeleteCreatedQueues=" +
|
||||
autoDeleteCreatedQueues +
|
||||
", autoDeleteQueuesDelay=" +
|
||||
autoDeleteQueuesDelay +
|
||||
", autoDeleteQueuesMessageCount=" +
|
||||
|
|
|
@ -3160,6 +3160,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="auto-delete-created-queues" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
whether or not to delete created queues when the queue has 0 consumers and 0 messages
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="auto-delete-queues-delay" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -597,6 +597,7 @@ that would be found in the `broker.xml` file.
|
|||
<auto-delete-jms-topics>true</auto-delete-jms-topics> <!-- deprecated! see auto-delete-addresses -->
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-delete-queues>true</auto-delete-queues>
|
||||
<auto-delete-created-queues>false</auto-delete-created-queues>
|
||||
<auto-delete-queues-delay>0</auto-delete-queues-delay>
|
||||
<auto-delete-queues-message-count>0</auto-delete-queues-message-count>
|
||||
<config-delete-queues>OFF</config-delete-queues>
|
||||
|
@ -765,6 +766,11 @@ auto-created queues when they have both 0 consumers and the message count is
|
|||
less than or equal to `auto-delete-queues-message-count`. Default is
|
||||
`true`.
|
||||
|
||||
`auto-delete-created-queues`. Whether or not the broker should automatically delete
|
||||
created queues when they have both 0 consumers and the message count is
|
||||
less than or equal to `auto-delete-queues-message-count`. Default is
|
||||
`false`.
|
||||
|
||||
`auto-delete-queues-delay`. How long to wait (in milliseconds) before deleting
|
||||
auto-created queues after the queue has 0 consumers and the message count is
|
||||
less than or equal to `auto-delete-queues-message-count`.
|
||||
|
|
|
@ -229,6 +229,7 @@ Name | Description | Default
|
|||
[auto-delete-jms-topics](address-model.md#configuring-addresses-and-queues-via-address-settings)| **deprecated** Delete JMS topics automatically; see `auto-create-queues` & `auto-create-addresses` | `true`
|
||||
[auto-create-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Create queues automatically | `true`
|
||||
[auto-delete-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete auto-created queues automatically | `true`
|
||||
[auto-delete-created-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete created queues automatically | `false`
|
||||
[auto-delete-queues-delay](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delay for deleting auto-created queues | 0
|
||||
[config-delete-queues](config-reload.md)| How to deal with queues deleted from XML at runtime| `OFF`
|
||||
[auto-create-addresses](address-model.md#configuring-addresses-and-queues-via-address-settings) | Create addresses automatically | `true`
|
||||
|
|
|
@ -23,6 +23,8 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
|
@ -110,6 +112,125 @@ public class QueueAutoDeleteTest extends JMSTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteTopicDurableSubscriptionQueue() throws Exception {
|
||||
ConnectionFactory fact = getCF();
|
||||
Connection connection = fact.createConnection();
|
||||
connection.start();
|
||||
|
||||
try {
|
||||
|
||||
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
|
||||
String testQueueName = getName();
|
||||
String sub = testQueueName + "/mysub";
|
||||
|
||||
Topic topic = session.createTopic(testQueueName + "?auto-delete=true");
|
||||
ActiveMQDestination activeMQDestination = (ActiveMQDestination) topic;
|
||||
|
||||
assertEquals(testQueueName, topic.getTopicName());
|
||||
assertEquals(true, activeMQDestination.getQueueAttributes().getAutoDelete());
|
||||
|
||||
|
||||
MessageConsumer consumer = session.createSharedDurableConsumer(topic, sub);
|
||||
|
||||
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertTrue(queueBinding.getQueue().isAutoDelete());
|
||||
assertEquals(0, queueBinding.getQueue().getMessageCount());
|
||||
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
producer.send(session.createTextMessage("hello1"));
|
||||
producer.send(session.createTextMessage("hello2"));
|
||||
|
||||
Message message = consumer.receive(100);
|
||||
assertNotNull(message);
|
||||
assertEquals("hello1", ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
consumer.close();
|
||||
|
||||
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertNotNull(queueBinding);
|
||||
|
||||
consumer = session.createSharedDurableConsumer(topic, sub);
|
||||
message = consumer.receive(100);
|
||||
assertNotNull(message);
|
||||
assertEquals("hello2", ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
consumer.close();
|
||||
|
||||
//Wait longer than scan period.
|
||||
Thread.sleep(20);
|
||||
|
||||
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertNull(queueBinding);
|
||||
|
||||
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteTopicDefaultDurableSubscriptionQueue() throws Exception {
|
||||
ConnectionFactory fact = getCF();
|
||||
Connection connection = fact.createConnection();
|
||||
connection.start();
|
||||
|
||||
try {
|
||||
|
||||
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
|
||||
String testQueueName = getName();
|
||||
String sub = testQueueName + "/mysub";
|
||||
|
||||
Topic topic = session.createTopic(testQueueName);
|
||||
|
||||
assertEquals(testQueueName, topic.getTopicName());
|
||||
|
||||
|
||||
MessageConsumer consumer = session.createSharedDurableConsumer(topic, sub);
|
||||
|
||||
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertFalse(queueBinding.getQueue().isAutoDelete());
|
||||
assertEquals(0, queueBinding.getQueue().getMessageCount());
|
||||
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
producer.send(session.createTextMessage("hello1"));
|
||||
producer.send(session.createTextMessage("hello2"));
|
||||
|
||||
Message message = consumer.receive(100);
|
||||
assertNotNull(message);
|
||||
assertEquals("hello1", ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
consumer.close();
|
||||
|
||||
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertNotNull(queueBinding);
|
||||
|
||||
consumer = session.createSharedDurableConsumer(topic, sub);
|
||||
message = consumer.receive(100);
|
||||
assertNotNull(message);
|
||||
assertEquals("hello2", ((TextMessage)message).getText());
|
||||
message.acknowledge();
|
||||
|
||||
consumer.close();
|
||||
|
||||
//Wait longer than scan period.
|
||||
Thread.sleep(20);
|
||||
|
||||
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
|
||||
assertNotNull(queueBinding);
|
||||
|
||||
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAutoDeleteOff() throws Exception {
|
||||
ConnectionFactory fact = getCF();
|
||||
|
|
Loading…
Reference in New Issue