ARTEMIS-2263 Support queue level auto-delete configuration

Add ability to configure when creating auto created queues at the queue level
Add support for configuring message count check
Add test cases
Update docs
This commit is contained in:
Michael André Pearce 2019-02-26 17:37:31 +00:00
parent 4c84b119cc
commit 9b01e9521c
38 changed files with 1074 additions and 131 deletions

View File

@ -35,6 +35,9 @@ public class QueueAttributes implements Serializable {
public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
public static final String CONSUMER_PRIORITY = "consumer-priority";
public static final String AUTO_DELETE = "auto-delete";
public static final String AUTO_DELETE_DELAY = "auto-delete-delay";
public static final String AUTO_DELETE_MESSAGE_COUNT = "auto-delete-message-count";
private RoutingType routingType;
private SimpleString filterString;
@ -50,6 +53,10 @@ public class QueueAttributes implements Serializable {
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Integer consumerPriority;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
public void set(String key, String value) {
if (key != null && value != null) {
@ -81,6 +88,12 @@ public class QueueAttributes implements Serializable {
setGroupRebalance(Boolean.valueOf(value));
} else if (key.equals(GROUP_BUCKETS)) {
setGroupBuckets(Integer.valueOf(value));
} else if (key.equals(AUTO_DELETE)) {
setAutoDelete(Boolean.valueOf(value));
} else if (key.equals(AUTO_DELETE_DELAY)) {
setAutoDeleteDelay(Long.valueOf(value));
} else if (key.equals(AUTO_DELETE_MESSAGE_COUNT)) {
setAutoDeleteMessageCount(Long.valueOf(value));
}
}
}
@ -210,4 +223,31 @@ public class QueueAttributes implements Serializable {
this.groupBuckets = groupBuckets;
return this;
}
public Boolean getAutoDelete() {
return autoDelete;
}
public QueueAttributes setAutoDelete(Boolean autoDelete) {
this.autoDelete = autoDelete;
return this;
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public QueueAttributes setAutoDeleteDelay(Long autoDeleteDelay) {
this.autoDeleteDelay = autoDeleteDelay;
return this;
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
public QueueAttributes setAutoDeleteMessageCount(Long autoDeleteMessageCount) {
this.autoDeleteMessageCount = autoDeleteMessageCount;
return this;
}
}

View File

@ -501,6 +501,12 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;
public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true;
public static final long DEFAULT_QUEUE_AUTO_DELETE_DELAY = 0;
public static final long DEFAULT_QUEUE_AUTO_DELETE_MESSAGE_COUNT = 0;
public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0;
public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;
@ -1366,6 +1372,18 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PURGE_ON_NO_CONSUMERS;
}
public static boolean getDefaultQueueAutoDelete() {
return DEFAULT_QUEUE_AUTO_DELETE;
}
public static long getDefaultQueueAutoDeleteDelay() {
return DEFAULT_QUEUE_AUTO_DELETE_DELAY;
}
public static long getDefaultQueueAutoDeleteMessageCount() {
return DEFAULT_QUEUE_AUTO_DELETE_MESSAGE_COUNT;
}
public static int getDefaultConsumersBeforeDispatch() {
return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
}

View File

@ -162,6 +162,12 @@ public interface ClientSession extends XAResource, AutoCloseable {
Boolean isGroupRebalance();
Integer getGroupBuckets();
Boolean isAutoDelete();
Long getAutoDeleteDelay();
Long getAutoDeleteMessageCount();
}
// Lifecycle operations ------------------------------------------

View File

@ -605,6 +605,9 @@ public interface ActiveMQServerControl {
@Parameter(name = "nonDestructive", desc = "If the queue is non-destructive") boolean nonDestructive,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch,
@Parameter(name = "autoDelete", desc = "If the queue should be deleted once no consumers") boolean autoDelete,
@Parameter(name = "autoDeleteDelay", desc = "How long to wait (in milliseconds) before deleting auto-created queues after the queue has 0 consumers") long autoDeleteDelay,
@Parameter(name = "autoDeleteMessageCount", desc = "The message count the queue must be at or below before it can be evaluated to be auto deleted, 0 waits until empty queue (default) and -1 disables this check") long autoDeleteMessageCount,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
/**

View File

@ -64,6 +64,12 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Long delayBeforeDispatch;
private final Boolean autoDelete;
private final Long autoDeleteDelay;
private final Long autoDeleteMessageCount;
private final Integer defaultConsumerWindowSize;
@ -122,7 +128,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Boolean exclusive,
final Boolean lastValue,
final Integer defaultConsumerWindowSize) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, null, null, lastValue, null, null, null, null, defaultConsumerWindowSize);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, null, null, lastValue, null, null, null, null, null, null, null, defaultConsumerWindowSize);
}
public QueueQueryImpl(final boolean durable,
@ -146,6 +152,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
this.durable = durable;
this.temporary = temporary;
@ -168,6 +177,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@ -280,5 +292,20 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Integer getGroupBuckets() {
return groupBuckets;
}
@Override
public Boolean isAutoDelete() {
return autoDelete;
}
@Override
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
@Override
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
}

View File

@ -321,6 +321,9 @@ public class ActiveMQSessionContext extends SessionContext {
queueAttributes.getNonDestructive(),
queueAttributes.getConsumersBeforeDispatch(),
queueAttributes.getDelayBeforeDispatch(),
queueAttributes.getAutoDelete(),
queueAttributes.getAutoDeleteDelay(),
queueAttributes.getAutoDeleteMessageCount(),
true), PacketImpl.NULL_RESPONSE);
}
@ -873,7 +876,7 @@ public class ActiveMQSessionContext extends SessionContext {
// We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch());
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount());
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}

View File

@ -48,6 +48,12 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
private Long delayBeforeDispatch;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
final boolean temporary,
@ -72,7 +78,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
queueAttributes.getLastValueKey(),
queueAttributes.getNonDestructive(),
queueAttributes.getConsumersBeforeDispatch(),
queueAttributes.getDelayBeforeDispatch()
queueAttributes.getDelayBeforeDispatch(),
queueAttributes.getAutoDelete(),
queueAttributes.getAutoDeleteDelay(),
queueAttributes.getAutoDeleteMessageCount()
);
}
@ -93,7 +102,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final SimpleString lastValueKey,
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch) {
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount) {
this();
this.address = address;
@ -114,6 +126,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
}
public CreateQueueMessage_V2() {
@ -137,6 +152,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
buff.append(", nonDestructive=" + nonDestructive);
buff.append(", consumersBeforeDispatch=" + consumersBeforeDispatch);
buff.append(", delayBeforeDispatch=" + delayBeforeDispatch);
buff.append(", autoDelete=" + autoDelete);
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append("]");
return buff.toString();
}
@ -237,6 +256,30 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.groupBuckets = groupBuckets;
}
public Boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(Boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public void setAutoDeleteDelay(Long autoDeleteDelay) {
this.autoDeleteDelay = autoDeleteDelay;
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
public void setAutoDeleteMessageCount(Long autoDeleteMessageCount) {
this.autoDeleteMessageCount = autoDeleteMessageCount;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -252,6 +295,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
BufferHelper.writeNullableBoolean(buffer, groupRebalance);
BufferHelper.writeNullableInteger(buffer, groupBuckets);
BufferHelper.writeNullableBoolean(buffer, autoDelete);
BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
}
@Override
@ -272,6 +319,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
groupRebalance = BufferHelper.readNullableBoolean(buffer);
groupBuckets = BufferHelper.readNullableInteger(buffer);
autoDelete = BufferHelper.readNullableBoolean(buffer);
autoDeleteDelay = BufferHelper.readNullableLong(buffer);
autoDeleteMessageCount = BufferHelper.readNullableLong(buffer);
}
}
@ -291,6 +341,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
result = prime * result + (consumersBeforeDispatch == null ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + (delayBeforeDispatch == null ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
return result;
}
@ -349,6 +402,21 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
return false;
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch))
return false;
if (autoDelete == null) {
if (other.autoDelete != null)
return false;
} else if (!autoDelete.equals(other.autoDelete))
return false;
if (autoDeleteDelay == null) {
if (other.autoDeleteDelay != null)
return false;
} else if (!autoDeleteDelay.equals(other.autoDeleteDelay))
return false;
if (autoDeleteMessageCount == null) {
if (other.autoDeleteMessageCount != null)
return false;
} else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
return false;
if (routingType == null) {
if (other.routingType != null)
return false;

View File

@ -34,6 +34,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
private Boolean nonDestructive;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
public CreateSharedQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
@ -50,6 +53,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final boolean requiresResponse) {
this();
@ -68,6 +74,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.requiresResponse = requiresResponse;
}
@ -163,6 +172,30 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.groupBuckets = groupBuckets;
}
public Boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(Boolean autoDelete) {
this.autoDelete = autoDelete;
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public void setAutoDeleteDelay(Long autoDeleteDelay) {
this.autoDeleteDelay = autoDeleteDelay;
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
public void setAutoDeleteMessageCount(Long autoDeleteMessageCount) {
this.autoDeleteMessageCount = autoDeleteMessageCount;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
@ -181,6 +214,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
buff.append(", nonDestructive=" + nonDestructive);
buff.append(", consumersBeforeDispatch=" + consumersBeforeDispatch);
buff.append(", delayBeforeDispatch=" + delayBeforeDispatch);
buff.append(", autoDelete=" + autoDelete);
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", requiresResponse=" + requiresResponse);
buff.append("]");
return buff.toString();
@ -204,6 +240,10 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
BufferHelper.writeNullableBoolean(buffer, groupRebalance);
BufferHelper.writeNullableInteger(buffer, groupBuckets);
BufferHelper.writeNullableBoolean(buffer, autoDelete);
BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
}
@Override
@ -227,6 +267,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
groupRebalance = BufferHelper.readNullableBoolean(buffer);
groupBuckets = BufferHelper.readNullableInteger(buffer);
autoDelete = BufferHelper.readNullableBoolean(buffer);
autoDeleteDelay = BufferHelper.readNullableLong(buffer);
autoDeleteMessageCount = BufferHelper.readNullableLong(buffer);
}
}
@ -250,6 +293,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
result = prime * result + (consumersBeforeDispatch == null ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + (delayBeforeDispatch == null ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
return result;
}
@ -334,6 +380,21 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return false;
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch))
return false;
if (autoDelete == null) {
if (other.autoDelete != null)
return false;
} else if (!autoDelete.equals(other.autoDelete))
return false;
if (autoDeleteDelay == null) {
if (other.autoDeleteDelay != null)
return false;
} else if (!autoDeleteDelay.equals(other.autoDeleteDelay))
return false;
if (autoDeleteMessageCount == null) {
if (other.autoDeleteMessageCount != null)
return false;
} else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
return false;
return true;
}
}

View File

@ -50,14 +50,20 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Long delayBeforeDispatch;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
protected Integer defaultConsumerWindowSize;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.getDefaultConsumerWindowSize());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null);
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -81,6 +87,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
super(SESS_QUEUEQUERY_RESP_V3);
@ -126,6 +135,12 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@ -233,6 +248,18 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.groupBuckets = groupBuckets;
}
public Boolean isAutoDelete() {
return autoDelete;
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -249,6 +276,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
BufferHelper.writeNullableBoolean(buffer, groupRebalance);
BufferHelper.writeNullableInteger(buffer, groupBuckets);
BufferHelper.writeNullableBoolean(buffer, autoDelete);
BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
}
@Override
@ -272,6 +303,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
groupRebalance = BufferHelper.readNullableBoolean(buffer);
groupBuckets = BufferHelper.readNullableInteger(buffer);
autoDelete = BufferHelper.readNullableBoolean(buffer);
autoDeleteDelay = BufferHelper.readNullableLong(buffer);
autoDeleteMessageCount = BufferHelper.readNullableLong(buffer);
}
}
@ -291,6 +326,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
result = prime * result + (consumersBeforeDispatch == null ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + (delayBeforeDispatch == null ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
return result;
}
@ -317,13 +355,16 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", nonDestructive=" + nonDestructive);
buff.append(", consumersBeforeDispatch=" + consumersBeforeDispatch);
buff.append(", delayBeforeDispatch=" + delayBeforeDispatch);
buff.append(", autoDelete=" + autoDelete);
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), getDefaultConsumerWindowSize());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize());
}
@Override
@ -379,6 +420,21 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch))
return false;
if (autoDelete == null) {
if (other.autoDelete != null)
return false;
} else if (!autoDelete.equals(other.autoDelete))
return false;
if (autoDeleteDelay == null) {
if (other.autoDeleteDelay != null)
return false;
} else if (!autoDeleteDelay.equals(other.autoDeleteDelay))
return false;
if (autoDeleteMessageCount == null) {
if (other.autoDeleteMessageCount != null)
return false;
} else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
return false;
if (defaultConsumerWindowSize == null) {
if (other.defaultConsumerWindowSize != null)
return false;

View File

@ -63,6 +63,12 @@ public class QueueQueryResult {
private Long delayBeforeDispatch;
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
private Integer defaultConsumerWindowSize;
public QueueQueryResult(final SimpleString name,
@ -86,6 +92,9 @@ public class QueueQueryResult {
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
this.durable = durable;
@ -129,6 +138,12 @@ public class QueueQueryResult {
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@ -223,4 +238,16 @@ public class QueueQueryResult {
public Integer getGroupBuckets() {
return groupBuckets;
}
public Boolean isAutoDelete() {
return autoDelete;
}
public Long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public Long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
}

View File

@ -219,6 +219,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String AUTO_DELETE_QUEUES_DELAY = "auto-delete-queues-delay";
private static final String AUTO_DELETE_QUEUES_MESSAGE_COUNT = "auto-delete-queues-message-count";
private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";
private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";
@ -1061,6 +1063,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
long autoDeleteQueuesDelay = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(AUTO_DELETE_QUEUES_DELAY, autoDeleteQueuesDelay);
addressSettings.setAutoDeleteQueuesDelay(autoDeleteQueuesDelay);
} else if (AUTO_DELETE_QUEUES_MESSAGE_COUNT.equalsIgnoreCase(name)) {
long autoDeleteQueuesMessageCount = XMLUtil.parseLong(child);
Validators.MINUS_ONE_OR_GE_ZERO.validate(AUTO_DELETE_QUEUES_MESSAGE_COUNT, autoDeleteQueuesMessageCount);
addressSettings.setAutoDeleteQueuesMessageCount(autoDeleteQueuesMessageCount);
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value);

View File

@ -833,7 +833,11 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
addressSettings.getDefaultLastValueKey() == null ? null : addressSettings.getDefaultLastValueKey().toString(),
addressSettings.isDefaultNonDestructive(),
addressSettings.getDefaultConsumersBeforeDispatch(),
addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress
addressSettings.getDefaultDelayBeforeDispatch(),
addressSettings.isAutoDeleteQueues(),
addressSettings.getAutoDeleteQueuesDelay(),
addressSettings.getAutoDeleteQueuesMessageCount(),
autoCreateAddress
);
}
@ -853,6 +857,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
boolean nonDestructive,
int consumersBeforeDispatch,
long delayBeforeDispatch,
boolean autoDelete,
long autoDeleteDelay,
long autoDeleteMessageCount,
boolean autoCreateAddress) throws Exception {
checkStarted();
@ -864,7 +871,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress);
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());

View File

@ -89,4 +89,10 @@ public interface QueueBindingInfo {
boolean isGroupRebalance();
int getGroupBuckets();
boolean isAutoDelete();
long getAutoDeleteDelay();
long getAutoDeleteMessageCount();
}

View File

@ -1297,7 +1297,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
readLock();
try {

View File

@ -66,6 +66,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public int groupBuckets;
public boolean autoDelete;
public long autoDeleteDelay;
public long autoDeleteMessageCount;
public PersistentQueueBindingEncoding() {
}
@ -106,6 +112,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
groupRebalance +
", groupBuckets=" +
groupBuckets +
", autoDelete=" +
autoDelete +
", autoDeleteDelay=" +
autoDeleteDelay +
", autoDeleteMessageCount=" +
autoDeleteMessageCount +
"]";
}
@ -124,6 +136,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final byte routingType,
final boolean configurationManaged) {
this.name = name;
@ -141,6 +156,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.routingType = routingType;
this.configurationManaged = configurationManaged;
}
@ -307,6 +325,21 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
return groupBuckets;
}
@Override
public boolean isAutoDelete() {
return autoDelete;
}
@Override
public long getAutoDeleteDelay() {
return autoDeleteDelay;
}
@Override
public long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@ -383,6 +416,21 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
groupBuckets = ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
}
if (buffer.readableBytes() > 0) {
autoDelete = buffer.readBoolean();
} else {
autoDelete = ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete();
}
if (buffer.readableBytes() > 0) {
autoDeleteDelay = buffer.readLong();
} else {
autoDeleteDelay = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay();
}
if (buffer.readableBytes() > 0) {
autoDeleteMessageCount = buffer.readLong();
} else {
autoDeleteMessageCount = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount();
}
}
@Override
@ -404,6 +452,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeBoolean(nonDestructive);
buffer.writeBoolean(groupRebalance);
buffer.writeInt(groupBuckets);
buffer.writeBoolean(autoDelete);
buffer.writeLong(autoDeleteDelay);
buffer.writeLong(autoDeleteMessageCount);
}
@Override
@ -422,7 +473,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
SimpleString.sizeofNullableString(lastValueKey) +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_INT;
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG;
}

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
@ -1557,19 +1558,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
for (Queue queue : queues) {
for (Queue queue : getLocalQueues()) {
try {
queue.expireReferences();
} catch (Exception e) {
@ -1591,38 +1580,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void run() {
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
for (Queue queue : queues) {
int consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
boolean autoCreated = queue.isAutoCreated();
long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp();
if (!queue.isInternalQueue() && autoCreated && messageCount == 0 && consumerCount == 0 && consumerRemovedTimestamp != -1) {
SimpleString queueName = queue.getName();
AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString());
if (settings.isAutoDeleteQueues() && (System.currentTimeMillis() - consumerRemovedTimestamp >= settings.getAutoDeleteQueuesDelay())) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
}
try {
server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
for (Queue queue : getLocalQueues()) {
if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue) && QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue) && QueueManagerImpl.messageCountCheck(queue)) {
QueueManagerImpl.deleteAutoCreatedQueue(server, queue);
}
}
@ -1648,6 +1608,21 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
private List<Queue> getLocalQueues() {
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
return queues;
}
public static final class AddOperation implements TransactionOperation {
private final List<MessageReference> refs;

View File

@ -357,7 +357,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(), request.isAutoCreated());
request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(),
request.isAutoDelete(), request.getAutoDeleteDelay(), request.getAutoDeleteMessageCount(), request.isAutoCreated());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@ -381,7 +382,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch());
request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(),
request.isAutoDelete(), request.getAutoDeleteDelay(), request.getAutoDeleteMessageCount());
}
if (requiresResponse) {
response = createNullResponseMessage(packet);

View File

@ -398,7 +398,8 @@ public interface ActiveMQServer extends ServiceComponent {
void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
boolean groupRebalance, int groupBuckets, boolean lastValue,
SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch,
boolean autoDelete, long autoDeleteTimeout, long autoDeleteMessageCount) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary) throws Exception;
@ -412,7 +413,8 @@ public interface ActiveMQServer extends ServiceComponent {
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets,
boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch,
boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
@ -429,7 +431,7 @@ public interface ActiveMQServer extends ServiceComponent {
Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
@ -443,7 +445,7 @@ public interface ActiveMQServer extends ServiceComponent {
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance,
int groupBuckets, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
@Deprecated
Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;

View File

@ -66,6 +66,12 @@ public interface Queue extends Bindable,CriticalComponent {
*/
boolean isDurableMessage();
boolean isAutoDelete();
long getAutoDeleteDelay();
long getAutoDeleteMessageCount();
boolean isTemporary();
boolean isAutoCreated();

View File

@ -48,6 +48,9 @@ public final class QueueConfig {
private final boolean configurationManaged;
private final SimpleString lastValueKey;
private final boolean nonDestructive;
private final boolean autoDelete;
private final long autoDeleteDelay;
private final long autoDeleteMessageCount;
public static final class Builder {
@ -71,6 +74,9 @@ public final class QueueConfig {
private long delayBeforeDispatch;
private boolean groupRebalance;
private int groupBuckets;
private boolean autoDelete;
private long autoDeleteDelay;
private long autoDeleteMessageCount;
private boolean configurationManaged;
private Builder(final long id, final SimpleString name) {
@ -98,6 +104,9 @@ public final class QueueConfig {
this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
this.groupRebalance = ActiveMQDefaultConfiguration.getDefaultGroupRebalance();
this.groupBuckets = ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
this.autoDelete = ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete();
this.autoDeleteDelay = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay();
this.autoDeleteMessageCount = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount();
this.configurationManaged = false;
validateState();
}
@ -190,6 +199,22 @@ public final class QueueConfig {
return this;
}
public Builder autoDelete(final boolean autoDelete) {
this.autoDelete = autoDelete;
return this;
}
public Builder autoDeleteDelay(final long autoDeleteDelay) {
this.autoDeleteDelay = autoDeleteDelay;
return this;
}
public Builder autoDeleteMessageCount(final long autoDeleteMessageCount) {
this.autoDeleteMessageCount = autoDeleteMessageCount;
return this;
}
public Builder groupRebalance(final boolean groupRebalance) {
this.groupRebalance = groupRebalance;
return this;
@ -233,7 +258,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, groupRebalance, groupBuckets, configurationManaged);
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, groupRebalance, groupBuckets, autoDelete, autoDeleteDelay, autoDeleteMessageCount, configurationManaged);
}
}
@ -286,6 +311,9 @@ public final class QueueConfig {
final boolean purgeOnNoConsumers,
final boolean groupRebalance,
final int groupBuckets,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final boolean configurationManaged) {
this.id = id;
this.address = address;
@ -307,6 +335,9 @@ public final class QueueConfig {
this.delayBeforeDispatch = delayBeforeDispatch;
this.groupRebalance = groupRebalance;
this.groupBuckets = groupBuckets;
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.configurationManaged = configurationManaged;
}
@ -394,6 +425,18 @@ public final class QueueConfig {
return configurationManaged;
}
public boolean isAutoDelete() {
return autoDelete;
}
public long getAutoDeleteDelay() {
return autoDeleteDelay;
}
public long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
@Override
public boolean equals(Object o) {
if (this == o)
@ -443,6 +486,12 @@ public final class QueueConfig {
return false;
if (groupBuckets != that.groupBuckets)
return false;
if (autoDelete != that.autoDelete)
return false;
if (autoDeleteDelay != that.autoDeleteDelay)
return false;
if (autoDeleteMessageCount != that.autoDeleteMessageCount)
return false;
if (configurationManaged != that.configurationManaged)
return false;
return user != null ? user.equals(that.user) : that.user == null;
@ -471,6 +520,9 @@ public final class QueueConfig {
result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
result = 31 * result + (groupRebalance ? 1 : 0);
result = 31 * result + groupBuckets;
result = 31 * result + (autoDelete ? 1 : 0);
result = 31 * result + Long.hashCode(autoDeleteDelay);
result = 31 * result + Long.hashCode(autoDeleteMessageCount);
result = 31 * result + (configurationManaged ? 1 : 0);
return result;
}
@ -498,6 +550,9 @@ public final class QueueConfig {
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers
+ ", groupRebalance=" + groupRebalance
+ ", groupBuckets=" + groupBuckets
+ ", autoDelete=" + autoDelete
+ ", autoDeleteDelay=" + autoDeleteDelay
+ ", autoDeleteMessageCount=" + autoDeleteMessageCount
+ ", configurationManaged=" + configurationManaged + '}';
}
}

View File

@ -188,6 +188,9 @@ public interface ServerSession extends SecurityAuth {
Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
Boolean autoDelete,
Long autoDeleteDelay,
Long autoDeleteMessageCount,
boolean autoCreated) throws Exception;
Queue createQueue(SimpleString address,
@ -343,7 +346,10 @@ public interface ServerSession extends SecurityAuth {
SimpleString lastValueKey,
Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch) throws Exception;
Long delayBeforeDispatch,
Boolean autoDelete,
Long autoDeleteDelay,
Long autoDeleteMessageCount) throws Exception;
void createSharedQueue(SimpleString address,
SimpleString name,

View File

@ -911,6 +911,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
boolean autoDeleteQueues = addressSettings.isAutoDeleteQueues();
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
long autoDeleteQueuesMessageCount = addressSettings.getAutoDeleteQueuesMessageCount();
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
@ -921,12 +924,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), defaultConsumerWindowSize);
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize);
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, null, null, defaultConsumerWindowSize);
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, null, null, null, null, null, defaultConsumerWindowSize);
} else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, defaultConsumerWindowSize);
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, autoDeleteQueues, autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize);
}
return response;
@ -1721,8 +1724,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final boolean autoCreateAddress) throws Exception {
return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress);
}
@Override
@ -1744,18 +1750,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
}
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress, false);
}
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception {
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
}
@ -1764,7 +1770,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
}
@Override
@ -1772,7 +1778,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreateAddress);
}
@ -1809,7 +1815,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean exclusive,
boolean lastValue) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString());
createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount());
}
@Override
@ -1828,7 +1834,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString lastValueKey,
boolean nonDestructive,
int consumersBeforeDispatch,
long delayBeforeDispatch) throws Exception {
long delayBeforeDispatch,
boolean autoDelete,
long autoDeleteDelay,
long autoDeleteMessageCount) throws Exception {
//force the old contract about address
if (address == null) {
throw new NullPointerException("address can't be null!");
@ -1842,7 +1851,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true);
final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, true);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@ -1988,9 +1997,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
// This check is only valid if checkMessageCount == true
long messageCount = queue.getMessageCount();
if (checkMessageCount && messageCount != 0) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithMessages(queue.getName(), queueName, messageCount);
if (checkMessageCount && queue.getAutoDeleteMessageCount() != -1) {
long messageCount = queue.getMessageCount();
if (queue.getMessageCount() > queue.getAutoDeleteMessageCount()) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithMessages(queue.getName(), queueName, messageCount);
}
}
queue.deleteQueue(removeConsumers);
@ -2856,7 +2867,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} else {
// if the address::queue doesn't exist then create it
try {
createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true, true);
createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), true, true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@ -3054,6 +3065,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final boolean autoCreateAddress,
final boolean configurationManaged) throws Exception {
SimpleString realQueueName = CompositeAddress.extractQueueName(queueName);
@ -3121,6 +3135,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
.nonDestructive(nonDestructive)
.consumersBeforeDispatch(consumersBeforeDispatch)
.delayBeforeDispatch(delayBeforeDispatch)
.autoDelete(autoDelete)
.autoDeleteDelay(autoDeleteDelay)
.autoDeleteMessageCount(autoDeleteMessageCount)
.configurationManaged(configurationManaged)
.build();
@ -3200,8 +3217,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final boolean autoCreateAddress) throws Exception {
return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
}
@Deprecated

View File

@ -75,6 +75,9 @@ public class LastValueQueue extends QueueImpl {
final Boolean purgeOnNoConsumers,
final SimpleString lastValueKey,
final Boolean nonDestructive,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final boolean configurationManaged,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
@ -83,7 +86,7 @@ public class LastValueQueue extends QueueImpl {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, autoDelete, autoDeleteDelay, autoDeleteMessageCount, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
this.lastValueKey = lastValueKey;
}

View File

@ -159,6 +159,9 @@ public class PostOfficeJournalLoader implements JournalLoader {
.nonDestructive(queueBindingInfo.isNonDestructive())
.consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch())
.delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch())
.autoDelete(queueBindingInfo.isAutoDelete())
.autoDeleteDelay(queueBindingInfo.getAutoDeleteDelay())
.autoDeleteMessageCount(queueBindingInfo.getAutoDeleteMessageCount())
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
.configurationManaged((queueBindingInfo.isConfigurationManaged()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());

View File

@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
public Queue createQueueWith(final QueueConfig config) {
final Queue queue;
if (lastValueKey(config) != null) {
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), lastValueKey(config), config.isNonDestructive(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), lastValueKey(config), config.isNonDestructive(), config.isAutoDelete(), config.getAutoDeleteDelay(), config.getAutoDeleteMessageCount(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isAutoDelete(), config.getAutoDeleteDelay(), config.getAutoDeleteMessageCount(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
server.getCriticalAnalyzer().add(queue);
return queue;
@ -102,7 +102,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (lastValueKey(addressSettings) != null) {
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), lastValueKey(addressSettings), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), lastValueKey(addressSettings), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(), ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay(), ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount(),false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}

View File

@ -285,6 +285,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile long delayBeforeDispatch = 0;
private final boolean autoDelete;
private final long autoDeleteDelay;
private final long autoDeleteMessageCount;
private volatile boolean configurationManaged;
private volatile boolean nonDestructive;
@ -411,7 +417,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, false, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, false, null, null, purgeOnNoConsumers, null, null, null, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}
public QueueImpl(final long id,
@ -432,6 +438,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean purgeOnNoConsumers,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final boolean configurationManaged,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
@ -480,6 +489,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.groups = groupMap(this.groupBuckets);
this.autoDelete = autoDelete == null ? ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete() : autoDelete;
this.autoDeleteDelay = autoDeleteDelay == null ? ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay() : autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount == null ? ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount() : autoDeleteMessageCount;
this.configurationManaged = configurationManaged;
this.postOffice = postOffice;
@ -656,6 +671,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return propertyDurable && !purgeOnNoConsumers;
}
@Override
public boolean isAutoDelete() {
return autoDelete;
}
@Override
public long getAutoDeleteDelay() {
return autoDeleteDelay;
}
@Override
public long getAutoDeleteMessageCount() {
return autoDeleteMessageCount;
}
@Override
public boolean isTemporary() {
return temporary;
@ -1980,6 +2010,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (queueDestroyed) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}

View File

@ -40,30 +40,56 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
return;
}
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (isAutoDelete(queue) && consumerCountCheck(queue) && delayCheck(queue) && messageCountCheck(queue)) {
deleteAutoCreatedQueue(server, queue);
} else if (queue.isPurgeOnNoConsumers()) {
purge(queue);
}
}
private static void purge(Queue queue) {
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0 && settings.getAutoDeleteQueuesDelay() == 0) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
}
try {
server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
} else if (queue.isPurgeOnNoConsumers()) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
}
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queue.getName());
}
}
public static void deleteAutoCreatedQueue(ActiveMQServer server, Queue queue) {
SimpleString queueName = queue.getName();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount() + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
}
try {
server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
public static boolean isAutoDelete(Queue queue) {
return queue.isAutoCreated() && queue.isAutoDelete();
}
public static boolean messageCountCheck(Queue queue) {
return queue.getAutoDeleteMessageCount() == -1 || queue.getMessageCount() <= queue.getAutoDeleteMessageCount();
}
public static boolean delayCheck(Queue queue) {
long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp();
return consumerRemovedTimestamp != -1 && System.currentTimeMillis() - consumerRemovedTimestamp >= queue.getAutoDeleteDelay();
}
public static boolean consumerCountCheck(Queue queue) {
return queue.getConsumerCount() == 0;
}
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {

View File

@ -591,7 +591,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), false);
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), false);
}
public Queue createQueue(final AddressInfo addressInfo,
@ -609,6 +609,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoDelete,
final long autoDeleteDelay,
final long autoDeleteMessageCount,
final boolean autoCreated) throws Exception {
final SimpleString unPrefixedName = removePrefix(name);
@ -629,7 +632,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, as.isAutoCreateAddresses());
Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, as.isAutoCreateAddresses());
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@ -669,7 +672,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean purgeOnNoConsumers,
final boolean autoCreated) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreated);
}
@Override
@ -684,7 +687,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final Boolean exclusive,
final Boolean lastValue,
final boolean autoCreated) throws Exception {
return createQueue(address, name, routingType, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null, autoCreated);
return createQueue(address, name, routingType, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null, null, null, null, autoCreated);
}
@Override
@ -704,8 +707,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final Boolean nonDestructive,
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final boolean autoCreated) throws Exception {
if (exclusive == null || groupRebalance == null || groupBuckets == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null) {
if (exclusive == null || groupRebalance == null || groupBuckets == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null || autoDelete == null || autoDeleteDelay == null || autoDeleteMessageCount == null) {
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
exclusive == null ? as.isDefaultExclusiveQueue() : exclusive,
@ -716,10 +722,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
nonDestructive == null ? as.isDefaultNonDestructive() : nonDestructive,
consumersBeforeDispatch == null ? as.getDefaultConsumersBeforeDispatch() : consumersBeforeDispatch,
delayBeforeDispatch == null ? as.getDefaultDelayBeforeDispatch() : delayBeforeDispatch,
autoDelete == null ? as.isAutoDeleteQueues() : autoDelete,
autoDeleteDelay == null ? as.getAutoDeleteQueuesDelay() : autoDeleteDelay,
autoDeleteMessageCount == null ? as.getAutoDeleteQueuesMessageCount() : autoDeleteMessageCount,
autoCreated);
} else {
return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreated);
exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreated);
}
}
@ -738,14 +747,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreated);
}
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, Boolean exclusive, Boolean lastValue, boolean autoCreated) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(),
exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), as.isAutoDeleteQueues(), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), autoCreated);
}
@Override
@ -784,7 +793,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Boolean purgeOnNoConsumers,
Boolean exclusive,
Boolean lastValue) throws Exception {
createSharedQueue(address, name, routingType, filterString, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null);
createSharedQueue(address, name, routingType, filterString, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null, null, null, null);
}
@Override
@ -802,7 +811,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
SimpleString lastValueKey,
Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch) throws Exception {
Long delayBeforeDispatch,
Boolean autoDelete,
Long autoDeleteDelay,
Long autoDeleteMessageCount) throws Exception {
address = removePrefix(address);
securityCheck(address, name, durable ? CheckType.CREATE_DURABLE_QUEUE : CheckType.CREATE_NON_DURABLE_QUEUE, this);
@ -821,7 +833,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
lastValueKey == null ? as.getDefaultLastValueKey() : lastValueKey,
nonDestructive == null ? as.isDefaultNonDestructive() : nonDestructive,
consumersBeforeDispatch == null ? as.getDefaultConsumersBeforeDispatch() : consumersBeforeDispatch,
delayBeforeDispatch == null ? as.getDefaultDelayBeforeDispatch() : delayBeforeDispatch);
delayBeforeDispatch == null ? as.getDefaultDelayBeforeDispatch() : delayBeforeDispatch,
autoDelete == null ? as.isAutoDeleteQueues() : autoDelete,
autoDeleteDelay == null ? as.getAutoDeleteQueuesDelay() : delayBeforeDispatch,
autoDeleteMessageCount == null ? as.getAutoDeleteQueuesMessageCount() : autoDeleteMessageCount);
}
@Override

View File

@ -74,6 +74,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final long DEFAULT_AUTO_DELETE_QUEUES_DELAY = 0;
public static final long DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT = 0;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF;
public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true;
@ -169,6 +171,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Long autoDeleteQueuesDelay = null;
private Long autoDeleteQueuesMessageCount = null;
private DeletionPolicy configDeleteQueues = null;
private Boolean autoCreateAddresses = null;
@ -324,6 +328,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public long getAutoDeleteQueuesMessageCount() {
return autoDeleteQueuesMessageCount != null ? autoDeleteQueuesMessageCount : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT;
}
public AddressSettings setAutoDeleteQueuesMessageCount(final long autoDeleteQueuesMessageCount) {
this.autoDeleteQueuesMessageCount = autoDeleteQueuesMessageCount;
return this;
}
public DeletionPolicy getConfigDeleteQueues() {
return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES;
}
@ -768,6 +782,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (autoDeleteQueuesDelay == null) {
autoDeleteQueuesDelay = merged.autoDeleteQueuesDelay;
}
if (autoDeleteQueuesMessageCount == null) {
autoDeleteQueuesMessageCount = merged.autoDeleteQueuesMessageCount;
}
if (configDeleteQueues == null) {
configDeleteQueues = merged.configDeleteQueues;
}
@ -978,6 +995,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultGroupBuckets = BufferHelper.readNullableInteger(buffer);
}
if (buffer.readableBytes() > 0) {
autoDeleteQueuesMessageCount = BufferHelper.readNullableLong(buffer);
}
}
@Override
@ -1025,7 +1046,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableLong(autoDeleteQueuesDelay) +
BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay) +
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
BufferHelper.sizeOfNullableInteger(defaultGroupBuckets);
BufferHelper.sizeOfNullableInteger(defaultGroupBuckets) +
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount);
}
@Override
@ -1120,6 +1142,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableInteger(buffer, defaultGroupBuckets);
BufferHelper.writeNullableLong(buffer, autoDeleteQueuesMessageCount);
}
/* (non-Javadoc)
@ -1158,6 +1183,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode());
result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode());
result = prime * result + ((autoDeleteQueuesDelay == null) ? 0 : autoDeleteQueuesDelay.hashCode());
result = prime * result + ((autoDeleteQueuesMessageCount == null) ? 0 : autoDeleteQueuesMessageCount.hashCode());
result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode());
result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode());
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
@ -1335,6 +1361,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!autoDeleteQueuesDelay.equals(other.autoDeleteQueuesDelay))
return false;
if (autoDeleteQueuesMessageCount == null) {
if (other.autoDeleteQueuesMessageCount != null)
return false;
} else if (!autoDeleteQueuesMessageCount.equals(other.autoDeleteQueuesMessageCount))
return false;
if (configDeleteQueues == null) {
if (other.configDeleteQueues != null)
return false;
@ -1497,6 +1528,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteQueues +
", autoDeleteQueuesDelay=" +
autoDeleteQueuesDelay +
", autoDeleteQueuesMessageCount=" +
autoDeleteQueuesMessageCount +
", configDeleteQueues=" +
configDeleteQueues +
", autoCreateAddresses=" +

View File

@ -2991,11 +2991,18 @@
<xsd:annotation>
<xsd:documentation>
how long to wait (in milliseconds) before deleting auto-created queues after the queue has 0
consumers and 0 messages
consumers.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-delete-queues-message-count" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the message count the queue must be at or below before it can be evaluated to be auto deleted, 0 waits until empty queue (default) and -1 disables this check. </xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="config-delete-queues" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -946,6 +946,24 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return false;
}
@Override
public boolean isAutoDelete() {
// no-op
return false;
}
@Override
public long getAutoDeleteDelay() {
// no-op
return -1;
}
@Override
public long getAutoDeleteMessageCount() {
// no-op
return -1;
}
@Override
public boolean isTemporary() {
return false;

View File

@ -598,6 +598,7 @@ that would be found in the `broker.xml` file.
<auto-create-queues>true</auto-create-queues>
<auto-delete-queues>true</auto-delete-queues>
<auto-delete-queues-delay>0</auto-delete-queues-delay>
<auto-delete-queues-message-count>0</auto-delete-queues-message-count>
<config-delete-queues>OFF</config-delete-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-addresses>true</auto-delete-addresses>
@ -760,15 +761,30 @@ name fits the address `match`. Queues which are auto-created are durable,
non-temporary, and non-transient. Default is `true`.
`auto-delete-queues`. Whether or not the broker should automatically delete
auto-created queues when they have both 0 consumers and 0 messages. Default is
auto-created queues when they have both 0 consumers and the message count is
less than or equal to `auto-delete-queues-message-count`. Default is
`true`.
`auto-delete-queues-delay`. How long to wait (in milliseconds) before deleting
auto-created queues after the queue has 0 consumers and 0 messages. Default is
`0` (delete immediately). The broker's `address-queue-scan-period` controls
auto-created queues after the queue has 0 consumers and the message count is
less than or equal to `auto-delete-queues-message-count`.
Default is `0` (delete immediately). The broker's `address-queue-scan-period` controls
how often (in milliseconds) queues are scanned for potential deletion. Use `-1`
to disable scanning. The default scan value is `30000`.
`auto-delete-queues-message-count`. The message count that the queue must be
less than or equal to before deleting auto-created queues.
To disable message count check `-1` can be set.
Default is `0` (empty queue).
**Note:** the above auto-delete address settings can also be configured
individually at the queue level when a client auto creates the queue.
For Core API it is exposed in createQueue methods.
For Core JMS you can set it using the destination queue attributes
`my.destination?auto-delete=true&auto-delete-delay=120000&auto-delete-message-count=-1`
`config-delete-queues`. How the broker should handle queues deleted on config
reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more about
[configuration reload](config-reload.md).

View File

@ -46,8 +46,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 2, DELAY_BEFORE_DISPATCH, true);
server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 0, -1, true);
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 2, DELAY_BEFORE_DISPATCH, false, 0, 0, true);
server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 0, -1, false, 0, 0, true);
}

View File

@ -315,7 +315,7 @@ public class GroupingTest extends JMSTestBase {
String testQueueName = getName() + "_bucket_group";
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 2, false, null, false, 0, 0, true);
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 2, false, null, false, 0, 0, false, 0, 0, true);
JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
@ -397,7 +397,7 @@ public class GroupingTest extends JMSTestBase {
Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
String testQueueName = getName() + "_group_rebalance";
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, true, -1, false, null, false, 0, 0, true);
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, true, -1, false, null, false, 0, 0, false, 0, 0, true);
JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
@ -527,7 +527,7 @@ public class GroupingTest extends JMSTestBase {
Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
String testQueueName = getName() + "_group_disable";
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 0, false, null, false, 0, 0, true);
server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 0, false, null, false, 0, 0, false, 0, 0, true);
JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));

View File

@ -0,0 +1,325 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
/**
* QueueAutoDeleteTest this tests that we can configure at the queue level auto-delete behaviour of auto created queues.
*/
public class QueueAutoDeleteTest extends JMSTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
//Set scan period over aggressively so tests do not have to wait too long.
return super.createDefaultConfig(netty).setAddressQueueScanPeriod(10);
}
protected ConnectionFactory getCF() throws Exception {
return cf;
}
@Test
public void testAutoDelete() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(true, activeMQDestination.getQueueAttributes().getAutoDelete());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertTrue(queueBinding.getQueue().isAutoDelete());
assertEquals(2, queueBinding.getQueue().getMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertEquals(1, queueBinding.getQueue().getMessageCount());
consumer = session.createConsumer(queue);
message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period.
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNull(queueBinding);
} finally {
connection.close();
}
}
@Test
public void testAutoDeleteOff() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=false");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(false, activeMQDestination.getQueueAttributes().getAutoDelete());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertFalse(queueBinding.getQueue().isAutoDelete());
assertEquals(2, queueBinding.getQueue().getMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertEquals(1, queueBinding.getQueue().getMessageCount());
consumer = session.createConsumer(queue);
message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period.
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNotNull(queueBinding);
assertEquals(0, queueBinding.getQueue().getMessageCount());
} finally {
connection.close();
}
}
@Test
public void testAutoDeleteDelay() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true&auto-delete-delay=500");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(Long.valueOf(500), activeMQDestination.getQueueAttributes().getAutoDeleteDelay());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertTrue(queueBinding.getQueue().isAutoDelete());
assertEquals(500, queueBinding.getQueue().getAutoDeleteDelay());
assertEquals(2, queueBinding.getQueue().getMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertEquals(1, queueBinding.getQueue().getMessageCount());
consumer = session.createConsumer(queue);
message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period, but less than delay
Thread.sleep(50);
//Check the queue has not been removed.
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNotNull(queueBinding);
//Wait longer than auto delete delay
Thread.sleep(550);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNull(queueBinding);
} finally {
connection.close();
}
}
@Test
public void testAutoDeleteMessageCount() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true&auto-delete-message-count=1");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(Long.valueOf(1), activeMQDestination.getQueueAttributes().getAutoDeleteMessageCount());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertTrue(queueBinding.getQueue().isAutoDelete());
assertEquals(1, queueBinding.getQueue().getAutoDeleteMessageCount());
assertEquals(2, queueBinding.getQueue().getMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNull(queueBinding);
} finally {
connection.close();
}
}
@Test
public void testAutoDeleteMessageCountDisabled() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true&auto-delete-message-count=-1");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(Long.valueOf(-1), activeMQDestination.getQueueAttributes().getAutoDeleteMessageCount());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("hello" + i));
}
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertEquals(100, queueBinding.getQueue().getMessageCount());
assertTrue(queueBinding.getQueue().isAutoDelete());
assertEquals(-1, queueBinding.getQueue().getAutoDeleteMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(100);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNull(queueBinding);
} finally {
connection.close();
}
}
}

View File

@ -205,7 +205,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public String createQueue(String address, String routingType, String name, String filter, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets, boolean lastValue, String lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
public String createQueue(String address, String routingType, String name, String filter, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets, boolean lastValue, String lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception {
return (String) proxy.invokeOperation("createQueue", address, routingType, name, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
}

View File

@ -115,7 +115,7 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false, consumersBeforeDispatch, -1, true);
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false, consumersBeforeDispatch, -1, false, 0, 0, true);
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch());
@ -138,7 +138,7 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false,0, delayBeforeDispatch, true);
server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false,0, delayBeforeDispatch, false, 0, 0, true);
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch());

View File

@ -626,6 +626,24 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return false;
}
@Override
public boolean isAutoDelete() {
// no-op
return false;
}
@Override
public long getAutoDeleteDelay() {
// no-op
return -1;
}
@Override
public long getAutoDeleteMessageCount() {
// no-op
return -1;
}
@Override
public boolean isTemporary() {
// no-op