ARTEMIS-2787 - Add ability to disable and enable a queue

Add feature
Add tests
Add docs
Add missing bits noticed in ring-size
Address comments
This commit is contained in:
Michael Pearce 2020-06-01 12:53:00 +01:00 committed by Clebert Suconic
parent d0eabde447
commit 99f6c7bf20
38 changed files with 586 additions and 23 deletions

View File

@ -41,6 +41,7 @@ public class QueueAttributes implements Serializable {
public static final String AUTO_DELETE_DELAY = "auto-delete-delay";
public static final String AUTO_DELETE_MESSAGE_COUNT = "auto-delete-message-count";
public static final String RING_SIZE = "ring-size";
public static final String ENABLED = "enabled";
private RoutingType routingType;
private SimpleString filterString;
@ -61,6 +62,7 @@ public class QueueAttributes implements Serializable {
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
private Long ringSize;
private Boolean enabled;
public void set(String key, String value) {
@ -103,6 +105,8 @@ public class QueueAttributes implements Serializable {
setAutoDeleteMessageCount(Long.valueOf(value));
} else if (key.equals(RING_SIZE)) {
setRingSize(Long.valueOf(value));
} else if (key.equals(ENABLED)) {
setEnabled(Boolean.valueOf(value));
}
}
}
@ -113,6 +117,7 @@ public class QueueAttributes implements Serializable {
.setRoutingType(this.getRoutingType())
.setExclusive(this.getExclusive())
.setRingSize(this.getRingSize())
.setEnabled(this.isEnabled())
.setGroupRebalance(this.getGroupRebalance())
.setNonDestructive(this.getNonDestructive())
.setLastValue(this.getLastValue())
@ -139,6 +144,7 @@ public class QueueAttributes implements Serializable {
.setRoutingType(queueConfiguration.getRoutingType())
.setExclusive(queueConfiguration.isExclusive())
.setRingSize(queueConfiguration.getRingSize())
.setEnabled(queueConfiguration.isEnabled())
.setGroupRebalance(queueConfiguration.isGroupRebalance())
.setNonDestructive(queueConfiguration.isNonDestructive())
.setLastValue(queueConfiguration.isLastValue())
@ -327,4 +333,13 @@ public class QueueAttributes implements Serializable {
this.ringSize = ringSize;
return this;
}
public Boolean isEnabled() {
return enabled;
}
public QueueAttributes setEnabled(Boolean enabled) {
this.enabled = enabled;
return this;
}
}

View File

@ -62,6 +62,7 @@ public class QueueConfiguration implements Serializable {
public static final String LAST_VALUE_KEY = "last-value-key";
public static final String NON_DESTRUCTIVE = "non-destructive";
public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
public static final String ENABLED = "enabled";
public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
public static final String CONSUMER_PRIORITY = "consumer-priority";
@ -92,6 +93,7 @@ public class QueueConfiguration implements Serializable {
private SimpleString lastValueKey;
private Boolean nonDestructive;
private Boolean purgeOnNoConsumers;
private Boolean enabled;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Integer consumerPriority;
@ -202,6 +204,8 @@ public class QueueConfiguration implements Serializable {
setNonDestructive(Boolean.valueOf(value));
} else if (key.equals(PURGE_ON_NO_CONSUMERS)) {
setPurgeOnNoConsumers(Boolean.valueOf(value));
} else if (key.equals(ENABLED)) {
setEnabled(Boolean.valueOf(value));
} else if (key.equals(CONSUMERS_BEFORE_DISPATCH)) {
setConsumersBeforeDispatch(Integer.valueOf(value));
} else if (key.equals(DELAY_BEFORE_DISPATCH)) {
@ -409,6 +413,16 @@ public class QueueConfiguration implements Serializable {
return this;
}
public Boolean isEnabled() {
return enabled;
}
public QueueConfiguration setEnabled(Boolean enabled) {
this.enabled = enabled;
return this;
}
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}
@ -634,6 +648,9 @@ public class QueueConfiguration implements Serializable {
if (isPurgeOnNoConsumers() != null) {
builder.add(PURGE_ON_NO_CONSUMERS, isPurgeOnNoConsumers());
}
if (isEnabled() != null) {
builder.add(ENABLED, isEnabled());
}
if (getConsumersBeforeDispatch() != null) {
builder.add(CONSUMERS_BEFORE_DISPATCH, getConsumersBeforeDispatch());
}
@ -741,6 +758,8 @@ public class QueueConfiguration implements Serializable {
return false;
if (!Objects.equals(purgeOnNoConsumers, that.purgeOnNoConsumers))
return false;
if (!Objects.equals(enabled, that.enabled))
return false;
if (!Objects.equals(consumersBeforeDispatch, that.consumersBeforeDispatch))
return false;
if (!Objects.equals(delayBeforeDispatch, that.delayBeforeDispatch))
@ -789,6 +808,7 @@ public class QueueConfiguration implements Serializable {
result = 31 * result + Objects.hashCode(lastValueKey);
result = 31 * result + Objects.hashCode(nonDestructive);
result = 31 * result + Objects.hashCode(purgeOnNoConsumers);
result = 31 * result + Objects.hashCode(enabled);
result = 31 * result + Objects.hashCode(consumersBeforeDispatch);
result = 31 * result + Objects.hashCode(delayBeforeDispatch);
result = 31 * result + Objects.hashCode(consumerPriority);
@ -824,6 +844,7 @@ public class QueueConfiguration implements Serializable {
+ ", lastValueKey=" + lastValueKey
+ ", nonDestructive=" + nonDestructive
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers
+ ", enabled=" + enabled
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch
+ ", delayBeforeDispatch=" + delayBeforeDispatch
+ ", consumerPriority=" + consumerPriority

View File

@ -2670,4 +2670,30 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601727, value = "User {0} is updating a divert on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void updateDivert(String user, Object source, Object... args);
static void isEnabled(Object source) {
LOGGER.isEnabled(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601728, value = "User {0} is getting enabled property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isEnabled(String user, Object source, Object... args);
static void disable(Object source, Object... args) {
LOGGER.disable(getCaller(), source, arrayToString(args));
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601729, value = "User {0} is disabling on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void disable(String user, Object source, Object... args);
static void enable(Object source) {
LOGGER.resume(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601730, value = "User {0} is enabling on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void enable(String user, Object source, Object... args);
}

View File

@ -514,6 +514,8 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;
public static final boolean DEFAULT_ENABLED = true;
public static final boolean DEFAULT_QUEUE_AUTO_DELETE = true;
public static final boolean DEFAULT_CREATED_QUEUE_AUTO_DELETE = false;
@ -1453,6 +1455,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PURGE_ON_NO_CONSUMERS;
}
public static boolean getDefaultEnabled() {
return DEFAULT_ENABLED;
}
public static boolean getDefaultQueueAutoDelete(boolean autoCreated) {
return autoCreated ? getDefaultQueueAutoDelete() : getDefaultCreatedQueueAutoDelete();
}

View File

@ -173,6 +173,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Long getAutoDeleteMessageCount();
Long getRingSize();
Boolean isEnabled();
}
// Lifecycle operations ------------------------------------------

View File

@ -257,9 +257,27 @@ public interface QueueControl {
/**
*
*/
@Attribute(desc = "delete this queue when the last consumer disconnects")
@Attribute(desc = "purge this queue when the last consumer disconnects")
boolean isPurgeOnNoConsumers();
/**
*
*/
@Attribute(desc = "if the queue is enabled, default it is enabled, when disabled messages will not be routed to the queue")
boolean isEnabled();
/**
* Enables the queue. Messages are now routed to this queue.
*/
@Operation(desc = "Enables routing of messages to the Queue", impact = MBeanOperationInfo.ACTION)
void enable() throws Exception;
/**
* Enables the queue. Messages are not routed to this queue.
*/
@Operation(desc = "Disables routing of messages to the Queue", impact = MBeanOperationInfo.ACTION)
void disable() throws Exception;
/**
*
*/
@ -595,6 +613,7 @@ public interface QueueControl {
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
@Operation(desc = "List all the existent consumers on the Queue")
String listConsumersAsJSON() throws Exception;

View File

@ -74,6 +74,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Long ringSize;
private final Boolean enabled;
private final Integer defaultConsumerWindowSize;
@ -160,7 +162,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
}
public QueueQueryImpl(final boolean durable,
@ -189,7 +191,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize) {
final Long ringSize,
final Boolean enabled) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@ -217,6 +220,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
}
@Override
@ -353,5 +357,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Long getRingSize() {
return ringSize;
}
@Override
public Boolean isEnabled() {
return enabled;
}
}

View File

@ -905,7 +905,7 @@ public class ActiveMQSessionContext extends SessionContext {
// We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize());
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelayBeforeDispatch(), queueInfo.isAutoDelete(), queueInfo.getAutoDeleteDelay(), queueInfo.getAutoDeleteMessageCount(), queueInfo.getRingSize(), queueInfo.isEnabled());
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}

View File

@ -59,6 +59,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
private Long ringSize;
private Boolean enabled;
@Deprecated
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
@ -89,7 +91,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
queueAttributes.getAutoDelete(),
queueAttributes.getAutoDeleteDelay(),
queueAttributes.getAutoDeleteMessageCount(),
queueAttributes.getRingSize()
queueAttributes.getRingSize(),
queueAttributes.isEnabled()
);
}
@ -118,7 +121,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
queueConfiguration.isAutoDelete(),
queueConfiguration.getAutoDeleteDelay(),
queueConfiguration.getAutoDeleteMessageCount(),
queueConfiguration.getRingSize()
queueConfiguration.getRingSize(),
queueConfiguration.isEnabled()
);
}
@ -144,7 +148,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Long ringSize) {
final Long ringSize,
final Boolean enabled) {
this();
this.address = address;
@ -170,6 +175,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.ringSize = ringSize;
this.enabled = enabled;
}
public CreateQueueMessage_V2() {
@ -200,7 +206,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
.setAutoDeleteMessageCount(autoDeleteMessageCount)
.setTemporary(temporary)
.setAutoCreated(autoCreated)
.setRingSize(ringSize);
.setRingSize(ringSize)
.setEnabled(enabled);
}
@Override
@ -223,6 +230,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled);
buff.append("]");
return buff.toString();
@ -364,6 +372,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.ringSize = ringSize;
}
public Boolean isEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -384,6 +400,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
}
@Override
@ -414,6 +431,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
if (buffer.readableBytes() > 0) {
ringSize = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
enabled = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -437,6 +457,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled ? 1231 : 1237);
return result;
}
@ -520,6 +541,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
return false;
} else if (!ringSize.equals(other.ringSize))
return false;
if (enabled == null) {
if (other.enabled != null)
return false;
} else if (!enabled.equals(other.enabled))
return false;
if (routingType == null) {
if (other.routingType != null)
return false;

View File

@ -39,6 +39,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
private Boolean autoDelete;
private Long autoDeleteDelay;
private Long autoDeleteMessageCount;
private Long ringSize;
private Boolean enabled;
public CreateSharedQueueMessage_V2(final QueueConfiguration queueConfiguration, boolean requiresResponse) {
this(
@ -61,6 +63,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
queueConfiguration.isAutoDelete(),
queueConfiguration.getAutoDeleteDelay(),
queueConfiguration.getAutoDeleteMessageCount(),
queueConfiguration.getRingSize(),
queueConfiguration.isEnabled(),
requiresResponse
);
}
@ -84,6 +88,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Long ringSize,
final Boolean enabled,
final boolean requiresResponse) {
this();
@ -106,6 +112,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.autoDelete = autoDelete;
this.autoDeleteDelay = autoDeleteDelay;
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.ringSize = ringSize;
this.enabled = enabled;
this.requiresResponse = requiresResponse;
}
@ -233,6 +241,22 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
this.autoDeleteMessageCount = autoDeleteMessageCount;
}
public Long getRingSize() {
return ringSize;
}
public void setRingSize(Long ringSize) {
this.ringSize = ringSize;
}
public Boolean isEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public QueueConfiguration toQueueConfiguration() {
return new QueueConfiguration(queueName)
.setAddress(address)
@ -252,7 +276,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
.setLastValueKey(lastValueKey)
.setAutoDelete(autoDelete)
.setAutoDeleteDelay(autoDeleteDelay)
.setAutoDeleteMessageCount(autoDeleteMessageCount);
.setAutoDeleteMessageCount(autoDeleteMessageCount)
.setRingSize(ringSize)
.setEnabled(enabled);
}
@Override
@ -277,6 +303,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
buff.append(", autoDelete=" + autoDelete);
buff.append(", autoDeleteDelay=" + autoDeleteDelay);
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled);
buff.append(", requiresResponse=" + requiresResponse);
buff.append("]");
return buff.toString();
@ -304,6 +332,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
}
@Override
@ -334,6 +364,12 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
if (buffer.readableBytes() > 0) {
groupFirstKey = buffer.readNullableSimpleString();
}
if (buffer.readableBytes() > 0) {
ringSize = buffer.readNullableLong();
}
if (buffer.readableBytes() > 0) {
enabled = buffer.readNullableBoolean();
}
}
@Override
@ -360,7 +396,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled ? 1231 : 1237);
return result;
}
@ -464,6 +501,16 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return false;
} else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
return false;
if (ringSize == null) {
if (other.ringSize != null)
return false;
} else if (!ringSize.equals(other.ringSize))
return false;
if (enabled == null) {
if (other.enabled != null)
return false;
} else if (!enabled.equals(other.enabled))
return false;
return true;
}
}

View File

@ -62,12 +62,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Long ringSize;
private Boolean enabled;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null);
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -95,7 +97,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Boolean autoDelete,
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
final Integer defaultConsumerWindowSize,
final Long ringSize,
final Boolean enabled) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@ -149,6 +153,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
}
public boolean isAutoCreated() {
@ -279,6 +287,18 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return ringSize;
}
public void setRingSize(Long ringSize) {
this.ringSize = ringSize;
}
public Boolean isEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -300,6 +320,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
}
@ -334,6 +355,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) {
ringSize = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
enabled = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -358,6 +382,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
return result;
}
@ -389,12 +414,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled());
}
@Override
@ -475,6 +501,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!ringSize.equals(other.ringSize))
return false;
if (enabled == enabled) {
if (other.ringSize != null)
return false;
} else if (!enabled.equals(other.ringSize))
return false;
if (defaultConsumerWindowSize == null) {
if (other.defaultConsumerWindowSize != null)
return false;

View File

@ -75,6 +75,8 @@ public class QueueQueryResult {
private Long ringSize;
private Boolean enabled;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@ -101,7 +103,8 @@ public class QueueQueryResult {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize) {
final Long ringSize,
final Boolean enabled) {
this.durable = durable;
this.temporary = temporary;
@ -155,6 +158,8 @@ public class QueueQueryResult {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
}
public boolean isExists() {
@ -268,4 +273,8 @@ public class QueueQueryResult {
public Long getRingSize() {
return ringSize;
}
public Boolean isEnabled() {
return enabled;
}
}

View File

@ -57,6 +57,8 @@ public class CoreQueueConfiguration implements Serializable {
private Long delayBeforeDispatch;
private Boolean enabled;
private Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
@ -126,6 +128,10 @@ public class CoreQueueConfiguration implements Serializable {
return ringSize;
}
public Boolean isEnabled() {
return enabled;
}
public QueueConfiguration toQueueConfiguration() {
return new QueueConfiguration(this.getName())
.setAddress(this.getAddress())
@ -144,7 +150,8 @@ public class CoreQueueConfiguration implements Serializable {
.setGroupBuckets(this.getGroupBuckets())
.setGroupFirstKey(this.getGroupFirstKey())
.setUser(this.getUser())
.setLastValueKey(this.getLastValueKey());
.setLastValueKey(this.getLastValueKey())
.setEnabled(this.isEnabled());
}
public static CoreQueueConfiguration fromQueueConfiguration(QueueConfiguration queueConfiguration) {
@ -165,6 +172,7 @@ public class CoreQueueConfiguration implements Serializable {
.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch())
.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch())
.setRingSize(queueConfiguration.getRingSize() != null ? queueConfiguration.getRingSize() : ActiveMQDefaultConfiguration.getDefaultRingSize())
.setEnabled(queueConfiguration.isEnabled() != null ? queueConfiguration.isEnabled() : ActiveMQDefaultConfiguration.getDefaultEnabled())
.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers() != null ? queueConfiguration.isPurgeOnNoConsumers() : ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers())
.setRoutingType(queueConfiguration.getRoutingType() != null ? queueConfiguration.getRoutingType() : ActiveMQDefaultConfiguration.getDefaultRoutingType());
}
@ -233,6 +241,14 @@ public class CoreQueueConfiguration implements Serializable {
return this;
}
/**
* @param enabled for this queue, default is true
*/
public CoreQueueConfiguration setEnabled(Boolean enabled) {
this.enabled = enabled;
return this;
}
/**
* @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
*/
@ -321,6 +337,8 @@ public class CoreQueueConfiguration implements Serializable {
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
result = prime * result + ((ringSize == null) ? 0 : ringSize.hashCode());
result = prime * result + ((enabled == null) ? 0 : enabled.hashCode());
return result;
}
@ -361,6 +379,18 @@ public class CoreQueueConfiguration implements Serializable {
} else if (!purgeOnNoConsumers.equals(other.purgeOnNoConsumers)) {
return false;
}
if (ringSize == null) {
if (other.ringSize != null)
return false;
} else if (!ringSize.equals(other.ringSize)) {
return false;
}
if (enabled == null) {
if (other.enabled != null)
return false;
} else if (!enabled.equals(other.enabled)) {
return false;
}
if (exclusive == null) {
if (other.exclusive != null)
return false;
@ -447,6 +477,8 @@ public class CoreQueueConfiguration implements Serializable {
", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
", ringSize=" + ringSize +
", enabled=" + enabled +
"]";
}
}

View File

@ -1299,6 +1299,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Boolean nonDestructive = null;
Integer consumersBeforeDispatch = null;
Long delayBeforeDispatch = null;
Boolean enabled = null;
Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
NamedNodeMap attributes = node.getAttributes();
@ -1327,6 +1328,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
} else if (item.getNodeName().equals("delay-before-dispatch")) {
delayBeforeDispatch = Long.parseLong(item.getNodeValue());
} else if (item.getNodeName().equals("enabled")) {
enabled = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("ring-size")) {
ringSize = Long.parseLong(item.getNodeValue());
}
@ -1363,6 +1366,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
.setNonDestructive(nonDestructive)
.setConsumersBeforeDispatch(consumersBeforeDispatch)
.setDelayBeforeDispatch(delayBeforeDispatch)
.setEnabled(enabled)
.setRingSize(ringSize);
}

View File

@ -618,6 +618,52 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public void disable() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.disable(queue);
}
checkStarted();
clearIO();
try {
server.getPostOffice().updateQueue(queue.getQueueConfiguration().setEnabled(false));
} finally {
blockOnIO();
}
}
@Override
public void enable() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.enable(queue);
}
checkStarted();
clearIO();
try {
server.getPostOffice().updateQueue(queue.getQueueConfiguration().setEnabled(true));
} finally {
blockOnIO();
}
}
@Override
public boolean isEnabled() {
if (AuditLogger.isEnabled()) {
AuditLogger.isEnabled(queue);
}
checkStarted();
clearIO();
try {
return queue.isEnabled();
} finally {
blockOnIO();
}
}
@Override
public boolean isConfigurationManaged() {
if (AuditLogger.isEnabled()) {

View File

@ -58,6 +58,10 @@ public interface QueueBindingInfo {
void setPurgeOnNoConsumers(boolean purgeOnNoConsumers);
boolean isEnabled();
void setEnabled(boolean enabled);
boolean isExclusive();
void setExclusive(boolean exclusive);

View File

@ -1307,7 +1307,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
readLock();
try {

View File

@ -46,6 +46,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean purgeOnNoConsumers;
public boolean enabled;
public boolean exclusive;
public boolean lastValue;
@ -96,6 +98,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
maxConsumers +
", purgeOnNoConsumers=" +
purgeOnNoConsumers +
", enabled=" +
enabled +
", exclusive=" +
exclusive +
", lastValue=" +
@ -134,6 +138,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean autoCreated,
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean enabled,
final boolean exclusive,
final boolean groupRebalance,
final int groupBuckets,
@ -156,6 +161,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers;
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.enabled = enabled;
this.exclusive = exclusive;
this.groupRebalance = groupRebalance;
this.groupBuckets = groupBuckets;
@ -255,6 +261,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.purgeOnNoConsumers = purgeOnNoConsumers;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@Override
public boolean isExclusive() {
return exclusive;
@ -461,6 +477,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
}
if (buffer.readableBytes() > 0) {
enabled = buffer.readBoolean();
} else {
enabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
}
}
@Override
@ -487,6 +508,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeLong(autoDeleteMessageCount);
buffer.writeNullableSimpleString(groupFirstKey);
buffer.writeLong(ringSize);
buffer.writeBoolean(enabled);
}
@Override
@ -510,7 +532,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
SimpleString.sizeofNullableString(groupFirstKey) +
DataConstants.SIZE_LONG;
DataConstants.SIZE_LONG +
DataConstants.SIZE_BOOLEAN;
}
private SimpleString createMetadata() {

View File

@ -660,6 +660,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
}
if (queueConfiguration.isEnabled() != null && queue.isEnabled() != queueConfiguration.isEnabled().booleanValue()) {
changed = true;
queue.setEnabled(queueConfiguration.isEnabled());
}
if (queueConfiguration.isExclusive() != null && queue.isExclusive() != queueConfiguration.isExclusive().booleanValue()) {
changed = true;
queue.setExclusive(queueConfiguration.isExclusive());

View File

@ -94,6 +94,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setPurgeOnNoConsumers(boolean value);
boolean isEnabled();
void setEnabled(boolean value);
int getConsumersBeforeDispatch();
void setConsumersBeforeDispatch(int consumersBeforeDispatch);

View File

@ -971,6 +971,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey();
long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
long autoDeleteQueuesMessageCount = addressSettings.getAutoDeleteQueuesMessageCount();
long defaultRingSize = addressSettings.getDefaultRingSize();
boolean defaultEnabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
@ -981,12 +983,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize());
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled());
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null);
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
} else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, ActiveMQDefaultConfiguration.getDefaultRingSize());
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled);
}
return response;

View File

@ -143,6 +143,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.setTemporary(false)
.setAutoCreated(queueBindingInfo.isAutoCreated())
.setPurgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
.setEnabled(queueBindingInfo.isEnabled())
.setMaxConsumers(queueBindingInfo.getMaxConsumers())
.setExclusive(queueBindingInfo.isExclusive())
.setGroupRebalance(queueBindingInfo.isGroupRebalance())

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -37,11 +38,12 @@ public class QueueConfigurationUtils {
config.setRoutingType(config.getRoutingType() == null ? as.getDefaultQueueRoutingType() : config.getRoutingType());
config.setPurgeOnNoConsumers(config.isPurgeOnNoConsumers() == null ? as.isDefaultPurgeOnNoConsumers() : config.isPurgeOnNoConsumers());
config.setAutoCreateAddress(config.isAutoCreateAddress() == null ? as.isAutoCreateAddresses() : config.isAutoCreateAddress());
// set the default auto-delete
config.setAutoDelete(config.isAutoDelete() == null ? (config.isAutoCreated() && as.isAutoDeleteQueues()) || (!config.isAutoCreated() && as.isAutoDeleteCreatedQueues()) : config.isAutoDelete());
config.setAutoDeleteDelay(config.getAutoDeleteDelay() == null ? as.getAutoDeleteQueuesDelay() : config.getAutoDeleteDelay());
config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount());
config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled());
}
}

View File

@ -296,6 +296,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean purgeOnNoConsumers;
private volatile boolean enabled;
private final AddressInfo addressInfo;
private volatile RoutingType routingType;
@ -633,6 +635,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.purgeOnNoConsumers = queueConfiguration.isPurgeOnNoConsumers() == null ? ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers() : queueConfiguration.isPurgeOnNoConsumers();
this.enabled = queueConfiguration.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : queueConfiguration.isEnabled();
this.consumersBeforeDispatch = queueConfiguration.getConsumersBeforeDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch() : queueConfiguration.getConsumersBeforeDispatch();
this.delayBeforeDispatch = queueConfiguration.getDelayBeforeDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : queueConfiguration.getDelayBeforeDispatch();
@ -797,6 +801,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
if (!enabled) {
context.setReusable(false);
return;
}
if (purgeOnNoConsumers) {
context.setReusable(false);
if (getConsumerCount() == 0) {
@ -869,6 +877,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.purgeOnNoConsumers = value;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public synchronized void setEnabled(boolean value) {
this.enabled = value;
}
@Override
public int getMaxConsumers() {
return maxConsumers;

View File

@ -3789,6 +3789,7 @@
<xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
<xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
<xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
<xsd:attribute name="enabled" type="xsd:boolean" use="optional"/>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -50,6 +50,7 @@ public class QueueBindingEncodingTest extends Assert {
final byte routingType = RandomUtil.randomByte();
final boolean configurationManaged = RandomUtil.randomBoolean();
final long ringSize = RandomUtil.randomLong();
final boolean enabled = RandomUtil.randomBoolean();
PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
address,
@ -58,6 +59,7 @@ public class QueueBindingEncodingTest extends Assert {
autoCreated,
maxConsumers,
purgeOnNoConsumers,
enabled,
exclusive,
groupRebalance,
groupBuckets,
@ -87,6 +89,7 @@ public class QueueBindingEncodingTest extends Assert {
assertEquals(autoCreated, decoding.isAutoCreated());
assertEquals(maxConsumers, decoding.getMaxConsumers());
assertEquals(purgeOnNoConsumers, decoding.isPurgeOnNoConsumers());
assertEquals(enabled, decoding.isEnabled());
assertEquals(exclusive, decoding.isExclusive());
assertEquals(groupRebalance, decoding.isGroupRebalance());
assertEquals(groupBuckets, decoding.getGroupBuckets());

View File

@ -817,6 +817,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean value) {
}
@Override
public PagingStore getPagingStore() {
return null;

View File

@ -3573,6 +3573,7 @@
<xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
<xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
<xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
<xsd:attribute name="enabled" type="xsd:boolean" use="optional"/>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -527,6 +527,34 @@ Open the file `<broker-instance>/etc/broker.xml` for editing.
</addresses>
```
#### Disabled Queue
If a user requires to statically configure a queue and disable routing to it,
for example where a queue needs to be defined so a consumer can bind,
but you want to disable message routing to it for the time being.
Or you need to stop message flow to the queue to allow investigation keeping the consumer bound,
but dont wish to have further messages routed to the queue to avoid message build up.
When **enabled** is set to **true** the queue will have messages routed to it. (default)
When **enabled** is set to **false** the queue will NOT have messages routed to it.
Open the file `<broker-instance>/etc/broker.xml` for editing.
```xml
<addresses>
<address name="foo.bar">
<multicast>
<queue name="orders1" enabled="false"/>
</multicast>
</address>
</addresses>
```
Warning: Disabling all the queues on an address means that any message sent to that address will be silently dropped.
## Protocol Managers
A "protocol manager" maps protocol-specific concepts down to the core

View File

@ -211,6 +211,15 @@ a given property.)
The `QueueControl` can pause and resume the underlying queue. When a queue is
paused, it will receive messages but will not deliver them. When it's resumed,
it'll begin delivering the queued messages, if any.
- Disabling and Enabling Queues
The `QueueControl` can disable and enable the underlying queue. When a queue is
disabled, it will not longer have messages routed to it. When it's enabled,
it'll begin having messages routed to it again.
This is useful where you may need to disable message routing to a queue but wish to keep consumers active
to investigate issues, without causing further message build up in the queue.
#### Other Resources Management

View File

@ -268,6 +268,62 @@ public class AddressingTest extends ActiveMQTestBase {
Wait.assertEquals(1, server.locateQueue(queueName)::getMessageCount);
}
@Test
public void testQueueEnabledDisabled() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString defaultQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString enabledQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString disabledQueue = SimpleString.toSimpleString(UUID.randomUUID().toString());
//Validate default is enabled, and check that queues enabled receive messages and disabled do not on same address.
server.createQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
server.createQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
assertNotNull(server.locateQueue(defaultQueue));
assertNotNull(server.locateQueue(enabledQueue));
assertNotNull(server.locateQueue(disabledQueue));
ClientSession session = sessionFactory.createSession();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(true));
assertNotNull(server.locateQueue(defaultQueue));
assertNotNull(server.locateQueue(enabledQueue));
assertNotNull(server.locateQueue(disabledQueue));
Wait.assertEquals(1, server.locateQueue(defaultQueue)::getMessageCount);
Wait.assertEquals(1, server.locateQueue(enabledQueue)::getMessageCount);
Wait.assertEquals(0, server.locateQueue(disabledQueue)::getMessageCount);
//Update Queue Disable All
server.updateQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
server.updateQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
server.updateQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(false));
producer.send(session.createMessage(true));
Wait.assertEquals(1, server.locateQueue(defaultQueue)::getMessageCount);
Wait.assertEquals(1, server.locateQueue(enabledQueue)::getMessageCount);
Wait.assertEquals(0, server.locateQueue(disabledQueue)::getMessageCount);
//Update Queue Enable All
server.updateQueue(new QueueConfiguration(defaultQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
server.updateQueue(new QueueConfiguration(enabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
server.updateQueue(new QueueConfiguration(disabledQueue).setAddress(address).setRoutingType(RoutingType.MULTICAST).setEnabled(true));
producer.send(session.createMessage(true));
Wait.assertEquals(2, server.locateQueue(defaultQueue)::getMessageCount);
Wait.assertEquals(2, server.locateQueue(enabledQueue)::getMessageCount);
Wait.assertEquals(1, server.locateQueue(disabledQueue)::getMessageCount);
}
@Test
public void testLimitOnMaxConsumers() throws Exception {
SimpleString address = new SimpleString("test.address");

View File

@ -670,7 +670,27 @@ public class SessionTest extends ActiveMQTestBase {
assertTrue(result.isPurgeOnNoConsumers());
assertTrue(result.isExclusive());
assertTrue(result.isLastValue());
assertTrue(result.isEnabled());
server.destroyQueue(queueName);
}
{
if (!legacyCreateQueue) {
clientSession.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST).setFilterString("filter").setAutoCreated(true).setMaxConsumers(0).setPurgeOnNoConsumers(true).setExclusive(true).setLastValue(true).setEnabled(false));
Queue result = server.locateQueue(queueName);
assertEquals(addressName, result.getAddress());
assertEquals(queueName, result.getName());
assertEquals(RoutingType.ANYCAST, result.getRoutingType());
assertEquals("filter", result.getFilter().getFilterString().toString());
assertTrue(result.isDurable());
assertTrue(result.isAutoCreated());
assertEquals(0, result.getMaxConsumers());
assertTrue(result.isPurgeOnNoConsumers());
assertTrue(result.isExclusive());
assertTrue(result.isLastValue());
assertFalse(result.isEnabled());
server.destroyQueue(queueName);
}
}
}
}

View File

@ -568,6 +568,10 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@ -594,6 +598,11 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
} finally {
embeddedActiveMQ.stop();
}
@ -621,7 +630,10 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
} finally {
embeddedActiveMQ.stop();
}

View File

@ -180,6 +180,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Boolean) proxy.retrieveAttributeValue("purgeOnNoConsumers");
}
@Override
public boolean isEnabled() {
return (Boolean) proxy.retrieveAttributeValue("isEnabled");
}
@Override
public void enable() throws Exception {
proxy.invokeOperation("enable");
}
@Override
public void disable() throws Exception {
proxy.invokeOperation("disable");
}
@Override
public boolean isConfigurationManaged() {
return (Boolean) proxy.retrieveAttributeValue("configurationManaged");

View File

@ -173,6 +173,50 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
}
@Test
public void testQueueConfigEnabledAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(new QueueConfiguration(queue).setAddress(address).setEnabled(true));
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding1.getQueue().isEnabled());
server.stop();
server.start();
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isEnabled());
}
@Test
public void testQueueConfigDisabledAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(new QueueConfiguration(queue).setAddress(address).setEnabled(false));
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertFalse(queueBinding1.getQueue().isEnabled());
server.stop();
server.start();
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertFalse(queueBinding2.getQueue().isEnabled());
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -147,7 +147,9 @@ under the License.
</address>
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" />
<queue name="config_test_queue_change_queue" max-consumers="1" purge-on-no-consumers="true" enabled="false" />
<!-- checks that when not set, values are set back to defaults -->
<queue name="config_test_queue_change_queue_defaults" />
</multicast>
</address>
</addresses>

View File

@ -161,6 +161,7 @@ under the License.
<address name="config_test_queue_change">
<multicast>
<queue name="config_test_queue_change_queue" max-consumers="10" purge-on-no-consumers="false" />
<queue name="config_test_queue_change_queue_defaults" purge-on-no-consumers="true" enabled="false" />
</multicast>
</address>
</addresses>

View File

@ -50,6 +50,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean value) {
}
@Override
public PagingStore getPagingStore() {
return null;