This closes #2300
This commit is contained in:
commit
4507d7783f
|
@ -222,6 +222,12 @@ public interface QueueControl {
|
|||
@Attribute(desc = "delete this queue when the last consumer disconnects")
|
||||
boolean isPurgeOnNoConsumers();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "is this queue managed by configuration (broker.xml)")
|
||||
boolean isConfigurationManaged();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
|
|
@ -489,6 +489,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
return queue.isConfigurationManaged();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExclusive() {
|
||||
checkStarted();
|
||||
|
|
|
@ -40,6 +40,10 @@ public interface QueueBindingInfo {
|
|||
|
||||
boolean isAutoCreated();
|
||||
|
||||
boolean isConfigurationManaged();
|
||||
|
||||
void setConfigurationManaged(boolean configurationManaged);
|
||||
|
||||
SimpleString getUser();
|
||||
|
||||
void addQueueStatusEncoding(QueueStatusEncoding status);
|
||||
|
|
|
@ -1290,7 +1290,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
SimpleString filterString = filter == null ? null : filter.getFilterString();
|
||||
|
||||
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType());
|
||||
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
|
||||
|
||||
readLock();
|
||||
try {
|
||||
|
|
|
@ -56,6 +56,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
|
||||
public byte routingType;
|
||||
|
||||
public boolean configurationManaged;
|
||||
|
||||
public PersistentQueueBindingEncoding() {
|
||||
}
|
||||
|
||||
|
@ -86,6 +88,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
delayBeforeDispatch +
|
||||
", routingType=" +
|
||||
routingType +
|
||||
", configurationManaged=" +
|
||||
configurationManaged +
|
||||
"]";
|
||||
}
|
||||
|
||||
|
@ -100,7 +104,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
final boolean lastValue,
|
||||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final byte routingType) {
|
||||
final byte routingType,
|
||||
final boolean configurationManaged) {
|
||||
this.name = name;
|
||||
this.address = address;
|
||||
this.filterString = filterString;
|
||||
|
@ -113,6 +118,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
this.consumersBeforeDispatch = consumersBeforeDispatch;
|
||||
this.delayBeforeDispatch = delayBeforeDispatch;
|
||||
this.routingType = routingType;
|
||||
this.configurationManaged = configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,6 +160,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
return autoCreated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
return configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConfigurationManaged(boolean configurationManaged) {
|
||||
this.configurationManaged = configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addQueueStatusEncoding(QueueStatusEncoding status) {
|
||||
if (queueStatusEncodings == null) {
|
||||
|
@ -288,6 +304,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
} else {
|
||||
delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
|
||||
}
|
||||
if (buffer.readableBytes() > 0) {
|
||||
configurationManaged = buffer.readBoolean();
|
||||
} else {
|
||||
configurationManaged = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -304,6 +325,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
buffer.writeBoolean(lastValue);
|
||||
buffer.writeInt(consumersBeforeDispatch);
|
||||
buffer.writeLong(delayBeforeDispatch);
|
||||
buffer.writeBoolean(configurationManaged);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -317,7 +339,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
|
|||
DataConstants.SIZE_BOOLEAN +
|
||||
DataConstants.SIZE_BOOLEAN +
|
||||
DataConstants.SIZE_INT +
|
||||
DataConstants.SIZE_LONG;
|
||||
DataConstants.SIZE_LONG +
|
||||
DataConstants.SIZE_BOOLEAN;
|
||||
}
|
||||
|
||||
private SimpleString createMetadata() {
|
||||
|
|
|
@ -73,7 +73,8 @@ public interface PostOffice extends ActiveMQComponent {
|
|||
Boolean exclusive,
|
||||
Integer consumersBeforeDispatch,
|
||||
Long delayBeforeDispatch,
|
||||
SimpleString user) throws Exception;
|
||||
SimpleString user,
|
||||
Boolean configurationManaged) throws Exception;
|
||||
|
||||
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
||||
|
||||
|
|
|
@ -471,7 +471,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
Boolean exclusive,
|
||||
Integer consumersBeforeDispatch,
|
||||
Long delayBeforeDispatch,
|
||||
SimpleString user) throws Exception {
|
||||
SimpleString user,
|
||||
Boolean configurationManaged) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
|
||||
if (queueBinding == null) {
|
||||
|
@ -527,6 +528,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
changed = true;
|
||||
queue.setFilter(filter);
|
||||
}
|
||||
if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
|
||||
changed = true;
|
||||
queue.setConfigurationManaged(configurationManaged);
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (user == null && queue.getUser() != null) {
|
||||
logger.debug("Ignoring updating Queue to a NULL user");
|
||||
|
|
|
@ -94,6 +94,10 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
|
||||
void setMaxConsumer(int maxConsumers);
|
||||
|
||||
boolean isConfigurationManaged();
|
||||
|
||||
void setConfigurationManaged(boolean configurationManaged);
|
||||
|
||||
void addConsumer(Consumer consumer) throws Exception;
|
||||
|
||||
void removeConsumer(Consumer consumer);
|
||||
|
|
|
@ -42,6 +42,7 @@ public final class QueueConfig {
|
|||
private final boolean purgeOnNoConsumers;
|
||||
private final int consumersBeforeDispatch;
|
||||
private final long delayBeforeDispatch;
|
||||
private final boolean configurationManaged;
|
||||
|
||||
public static final class Builder {
|
||||
|
||||
|
@ -61,6 +62,7 @@ public final class QueueConfig {
|
|||
private boolean purgeOnNoConsumers;
|
||||
private int consumersBeforeDispatch;
|
||||
private long delayBeforeDispatch;
|
||||
private boolean configurationManaged;
|
||||
|
||||
private Builder(final long id, final SimpleString name) {
|
||||
this(id, name, name);
|
||||
|
@ -83,6 +85,7 @@ public final class QueueConfig {
|
|||
this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
|
||||
this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
|
||||
this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
|
||||
this.configurationManaged = false;
|
||||
validateState();
|
||||
}
|
||||
|
||||
|
@ -99,6 +102,11 @@ public final class QueueConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public Builder configurationManaged(final boolean configurationManaged) {
|
||||
this.configurationManaged = configurationManaged;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder filter(final Filter filter) {
|
||||
this.filter = filter;
|
||||
return this;
|
||||
|
@ -185,7 +193,7 @@ public final class QueueConfig {
|
|||
} else {
|
||||
pageSubscription = null;
|
||||
}
|
||||
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers);
|
||||
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -233,7 +241,8 @@ public final class QueueConfig {
|
|||
final boolean lastValue,
|
||||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean purgeOnNoConsumers) {
|
||||
final boolean purgeOnNoConsumers,
|
||||
final boolean configurationManaged) {
|
||||
this.id = id;
|
||||
this.address = address;
|
||||
this.name = name;
|
||||
|
@ -250,6 +259,7 @@ public final class QueueConfig {
|
|||
this.maxConsumers = maxConsumers;
|
||||
this.consumersBeforeDispatch = consumersBeforeDispatch;
|
||||
this.delayBeforeDispatch = delayBeforeDispatch;
|
||||
this.configurationManaged = configurationManaged;
|
||||
}
|
||||
|
||||
public long id() {
|
||||
|
@ -316,6 +326,10 @@ public final class QueueConfig {
|
|||
return delayBeforeDispatch;
|
||||
}
|
||||
|
||||
public boolean isConfigurationManaged() {
|
||||
return configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
|
@ -357,6 +371,8 @@ public final class QueueConfig {
|
|||
return false;
|
||||
if (purgeOnNoConsumers != that.purgeOnNoConsumers)
|
||||
return false;
|
||||
if (configurationManaged != that.configurationManaged)
|
||||
return false;
|
||||
return user != null ? user.equals(that.user) : that.user == null;
|
||||
|
||||
}
|
||||
|
@ -379,6 +395,7 @@ public final class QueueConfig {
|
|||
result = 31 * result + consumersBeforeDispatch;
|
||||
result = 31 * result + Long.hashCode(delayBeforeDispatch);
|
||||
result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
|
||||
result = 31 * result + (configurationManaged ? 1 : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -400,6 +417,7 @@ public final class QueueConfig {
|
|||
+ ", lastValue=" + lastValue
|
||||
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch
|
||||
+ ", delayBeforeDispatch=" + delayBeforeDispatch
|
||||
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}';
|
||||
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers
|
||||
+ ", configurationManaged=" + configurationManaged + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2729,7 +2729,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
private List<Queue> listConfiguredQueues(SimpleString address) throws Exception {
|
||||
return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList());
|
||||
return listQueues(address).stream().filter(queue -> queue.isConfigurationManaged()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<Queue> listQueues(SimpleString address) throws Exception {
|
||||
|
@ -2794,7 +2794,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
} else {
|
||||
// if the address::queue doesn't exist then create it
|
||||
try {
|
||||
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
|
||||
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true, true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// the queue may exist on a *different* address
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
|
||||
|
@ -3117,6 +3117,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean autoCreateAddress) throws Exception {
|
||||
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
|
||||
}
|
||||
|
||||
private Queue createQueue(final SimpleString address,
|
||||
final RoutingType routingType,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean ignoreIfExists,
|
||||
final boolean transientQueue,
|
||||
final boolean autoCreated,
|
||||
final int maxConsumers,
|
||||
final boolean purgeOnNoConsumers,
|
||||
final boolean exclusive,
|
||||
final boolean lastValue,
|
||||
final int consumersBeforeDispatch,
|
||||
final long delayBeforeDispatch,
|
||||
final boolean autoCreateAddress,
|
||||
final boolean configurationManaged) throws Exception {
|
||||
|
||||
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
|
||||
if (binding != null) {
|
||||
|
@ -3170,6 +3191,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.lastValue(lastValue)
|
||||
.consumersBeforeDispatch(consumersBeforeDispatch)
|
||||
.delayBeforeDispatch(delayBeforeDispatch)
|
||||
.configurationManaged(configurationManaged)
|
||||
.build();
|
||||
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
|
@ -3265,8 +3287,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
Integer consumersBeforeDispatch,
|
||||
Long delayBeforeDispatch,
|
||||
String user) throws Exception {
|
||||
return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
|
||||
}
|
||||
|
||||
private Queue updateQueue(String name,
|
||||
RoutingType routingType,
|
||||
String filterString,
|
||||
Integer maxConsumers,
|
||||
Boolean purgeOnNoConsumers,
|
||||
Boolean exclusive,
|
||||
Integer consumersBeforeDispatch,
|
||||
Long delayBeforeDispatch,
|
||||
String user,
|
||||
Boolean configurationManaged) throws Exception {
|
||||
final Filter filter = FilterImpl.createFilter(filterString);
|
||||
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
|
||||
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged);
|
||||
if (queueBinding != null) {
|
||||
final Queue queue = queueBinding.getQueue();
|
||||
return queue;
|
||||
|
|
|
@ -66,6 +66,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
final Integer consumersBeforeDispatch,
|
||||
final Long delayBeforeDispatch,
|
||||
final Boolean purgeOnNoConsumers,
|
||||
final boolean configurationManaged,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
|
@ -73,7 +74,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
.lastValue(queueBindingInfo.isLastValue())
|
||||
.consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch())
|
||||
.delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch())
|
||||
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
|
||||
.routingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
|
||||
.configurationManaged((queueBindingInfo.isConfigurationManaged()));
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
|
||||
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
|
||||
|
||||
|
|
|
@ -74,11 +74,10 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
public Queue createQueueWith(final QueueConfig config) {
|
||||
final Queue queue;
|
||||
if (config.isLastValue()) {
|
||||
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
} else {
|
||||
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
}
|
||||
|
||||
server.getCriticalAnalyzer().add(queue);
|
||||
return queue;
|
||||
}
|
||||
|
@ -102,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
|
||||
Queue queue;
|
||||
if (addressSettings.isDefaultLastValueQueue()) {
|
||||
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
} else {
|
||||
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
}
|
||||
|
|
|
@ -279,9 +279,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
public volatile long dispatchStartTime = -1;
|
||||
|
||||
private int consumersBeforeDispatch = 0;
|
||||
private volatile int consumersBeforeDispatch = 0;
|
||||
|
||||
private long delayBeforeDispatch = 0;
|
||||
private volatile long delayBeforeDispatch = 0;
|
||||
|
||||
private volatile boolean configurationManaged;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -429,7 +431,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
}
|
||||
|
||||
public QueueImpl(final long id,
|
||||
|
@ -447,6 +449,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final Integer consumersBeforeDispatch,
|
||||
final Long delayBeforeDispatch,
|
||||
final Boolean purgeOnNoConsumers,
|
||||
final boolean configurationManaged,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
|
@ -486,6 +489,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch;
|
||||
|
||||
this.configurationManaged = configurationManaged;
|
||||
|
||||
this.postOffice = postOffice;
|
||||
|
||||
this.storageManager = storageManager;
|
||||
|
@ -662,6 +667,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
this.maxConsumers = maxConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
return configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setConfigurationManaged(boolean configurationManaged) {
|
||||
this.configurationManaged = configurationManaged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getName() {
|
||||
return name;
|
||||
|
|
|
@ -833,6 +833,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConfigurationManaged(boolean configurationManaged) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recheckRefCount(OperationContext context) {
|
||||
}
|
||||
|
|
|
@ -320,8 +320,15 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
|
||||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
|
||||
try (JMSContext jmsContext = connectionFactory.createContext()) {
|
||||
jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100);
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
|
||||
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
|
||||
|
@ -344,6 +351,9 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
//Ensure queues created by clients (NOT by broker.xml are not removed when we reload).
|
||||
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
|
||||
|
||||
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
|
||||
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
|
||||
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
|
||||
|
|
|
@ -146,6 +146,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
return (Boolean) proxy.retrieveAttributeValue("purgeOnNoConsumers");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
return (Boolean) proxy.retrieveAttributeValue("configurationManaged");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExclusive() {
|
||||
return (Boolean) proxy.retrieveAttributeValue("exclusive");
|
||||
|
|
|
@ -117,6 +117,10 @@ under the License.
|
|||
</wildcard-addresses>
|
||||
|
||||
<addresses>
|
||||
<address name="config_test_consumer_created_queues">
|
||||
<multicast>
|
||||
</multicast>
|
||||
</address>
|
||||
<address name="config_test_queue_removal">
|
||||
<multicast>
|
||||
<queue name="config_test_queue_removal_queue_1"/>
|
||||
|
|
|
@ -120,6 +120,10 @@ under the License.
|
|||
</wildcard-addresses>
|
||||
|
||||
<addresses>
|
||||
<address name="config_test_consumer_created_queues">
|
||||
<multicast>
|
||||
</multicast>
|
||||
</address>
|
||||
<address name="config_test_queue_removal">
|
||||
<multicast>
|
||||
<queue name="config_test_queue_removal_queue_1"/>
|
||||
|
|
|
@ -105,6 +105,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConfigurationManaged() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConfigurationManaged(boolean configurationManaged) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInternalQueue() {
|
||||
// no-op
|
||||
|
|
|
@ -53,7 +53,8 @@ public class FakePostOffice implements PostOffice {
|
|||
Boolean exclusive,
|
||||
Integer consumersBeforeDispatch,
|
||||
Long delayBeforeDispatch,
|
||||
SimpleString user) throws Exception {
|
||||
SimpleString user,
|
||||
Boolean configurationManaged) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue