ARTEMIS-780 Implement MaxConsumers and DeleteOnNoConsumers

This commit is contained in:
Martyn Taylor 2016-12-09 17:46:41 +00:00
parent 18c6d3035f
commit 89e6ec36bb
27 changed files with 611 additions and 58 deletions

View File

@ -213,7 +213,25 @@ public enum ActiveMQExceptionType {
}
},
NOT_IMPLEMTNED_EXCEPTION(213);
NOT_IMPLEMTNED_EXCEPTION(213),
MAX_CONSUMER_LIMIT_EXCEEDED(214) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQQueueMaxConsumerLimitReached(msg);
}
},
UNEXPECTED_ROUTING_TYPE_FOR_ADDRESS(215) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQUnexpectedRoutingTypeForAddress(msg);
}
},
INVALID_QUEUE_CONFIGURATION(216) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQInvalidQueueConfiguration(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because a queue exists on the server.
*/
public final class ActiveMQInvalidQueueConfiguration extends ActiveMQException {
public ActiveMQInvalidQueueConfiguration() {
super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION);
}
public ActiveMQInvalidQueueConfiguration(String msg) {
super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION, msg);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because a queue exists on the server.
*/
public final class ActiveMQQueueMaxConsumerLimitReached extends ActiveMQException {
public ActiveMQQueueMaxConsumerLimitReached() {
super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
}
public ActiveMQQueueMaxConsumerLimitReached(String msg) {
super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because a queue exists on the server.
*/
public final class ActiveMQUnexpectedRoutingTypeForAddress extends ActiveMQException {
public ActiveMQUnexpectedRoutingTypeForAddress() {
super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
}
public ActiveMQUnexpectedRoutingTypeForAddress(String msg) {
super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
}
}

View File

@ -48,8 +48,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@ -866,7 +866,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFuture<>() : null;
return (sync && callback == null) ? new SimpleFuture<Boolean>() : null;
}
@Override
@ -2226,7 +2226,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue(), factory);
ioExecutorFactory = new OrderedExecutorFactory(threadPool);
} else {
ioExecutorFactory = providedIOThreadPool;

View File

@ -573,7 +573,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
clearIO();
try {
server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();

View File

@ -46,4 +46,11 @@ public interface QueueBindingInfo {
List<QueueStatusEncoding> getQueueStatusEncodings();
int getMaxConsumers();
void setMaxConsumers(int maxConsumers);
boolean isDeleteOnNoConsumers();
void setDeleteOnNoConsumers();
}

View File

@ -1266,6 +1266,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
}
}
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType());

View File

@ -41,6 +41,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public List<QueueStatusEncoding> queueStatusEncodings;
public int maxConsumers;
public boolean deleteOnNoConsumers;
public PersistentQueueBindingEncoding() {
}
@ -57,6 +61,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
user +
", autoCreated=" +
autoCreated +
", maxConsumers=" +
maxConsumers +
", deleteOnNoConsumers=" +
deleteOnNoConsumers +
"]";
}
@ -124,6 +132,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
return queueStatusEncodings;
}
@Override
public int getMaxConsumers() {
return 0;
}
@Override
public void setMaxConsumers(int maxConsumers) {
}
@Override
public boolean isDeleteOnNoConsumers() {
return false;
}
@Override
public void setDeleteOnNoConsumers() {
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
@ -144,6 +172,14 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
autoCreated = buffer.readBoolean();
if (buffer.readableBytes() > 0) {
maxConsumers = buffer.readInt();
deleteOnNoConsumers = buffer.readBoolean();
} else {
maxConsumers = -1;
deleteOnNoConsumers = false;
}
}
@Override
@ -153,13 +189,17 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
buffer.writeInt(maxConsumers);
buffer.writeBoolean(deleteOnNoConsumers);
}
@Override
public int getEncodeSize() {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata());
SimpleString.sizeofNullableString(createMetadata()) +
DataConstants.SIZE_INT +
DataConstants.SIZE_BOOLEAN;
}
private SimpleString createMetadata() {

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -494,6 +495,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} else {
ActiveMQServerLogger.LOGGER.caughtXaException(e);
}
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
response = new ActiveMQExceptionMessage(e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
} catch (ActiveMQException e) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);

View File

@ -29,16 +29,20 @@ import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidQueueConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.Message;
@ -378,4 +382,12 @@ public interface ActiveMQMessageBundle {
@Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.")
ActiveMQIOErrorException diskBeyondLimit();
@Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT)
ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName);
@Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType);
@Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue);
}

View File

@ -307,6 +307,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean durable,
boolean temporary) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filterString,
boolean durable,
boolean temporary,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
@ -314,6 +322,15 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean durable,
boolean temporary) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
@ -322,6 +339,16 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean temporary,
boolean autoCreated) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
Queue deployQueue(SimpleString address,
SimpleString queueName,
SimpleString filterString,
@ -341,6 +368,15 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address,
SimpleString resourceName,
SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
@ -392,10 +428,22 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressInfo getAddressInfo(SimpleString address);
Queue createQueue(SimpleString addressName,
SimpleString queueName,
SimpleString filterString,
SimpleString user,
boolean durable,
boolean temporary,
boolean ignoreIfExists,
boolean transientQueue,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
/*
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
* replace any factories with the same protocol
* */
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
* replace any factories with the same protocol
* */
void addProtocolManagerFactory(ProtocolManagerFactory factory);
/*

View File

@ -46,6 +46,10 @@ public interface Queue extends Bindable {
boolean isAutoCreated();
boolean isDeleteOnNoConsumers();
int getMaxConsumers();
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);

View File

@ -33,6 +33,8 @@ public final class QueueConfig {
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
private final int maxConsumers;
private final boolean deleteOnNoConsumers;
public static final class Builder {
@ -45,6 +47,8 @@ public final class QueueConfig {
private boolean durable;
private boolean temporary;
private boolean autoCreated;
private int maxConsumers;
private boolean deleteOnNoConsumers;
private Builder(final long id, final SimpleString name) {
this(id, name, name);
@ -60,6 +64,8 @@ public final class QueueConfig {
this.durable = true;
this.temporary = false;
this.autoCreated = true;
this.maxConsumers = -1;
this.deleteOnNoConsumers = false;
validateState();
}
@ -106,6 +112,16 @@ public final class QueueConfig {
return this;
}
public Builder maxConsumers(final int maxConsumers) {
this.maxConsumers = maxConsumers;
return this;
}
public Builder deleteOnNoConsumers(final boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
return this;
}
/**
* Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
* <br>
@ -127,7 +143,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated);
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers);
}
}
@ -168,7 +184,9 @@ public final class QueueConfig {
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
final boolean autoCreated,
final int maxConsumers,
final boolean deleteOnNoConsumers) {
this.id = id;
this.address = address;
this.name = name;
@ -178,6 +196,8 @@ public final class QueueConfig {
this.durable = durable;
this.temporary = temporary;
this.autoCreated = autoCreated;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.maxConsumers = maxConsumers;
}
public long id() {
@ -216,6 +236,14 @@ public final class QueueConfig {
return autoCreated;
}
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public int maxConsumers() {
return maxConsumers;
}
@Override
public boolean equals(Object o) {
if (this == o)
@ -241,6 +269,10 @@ public final class QueueConfig {
return false;
if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
return false;
if (maxConsumers != that.maxConsumers)
return false;
if (deleteOnNoConsumers != that.deleteOnNoConsumers)
return false;
return user != null ? user.equals(that.user) : that.user == null;
}
@ -256,11 +288,24 @@ public final class QueueConfig {
result = 31 * result + (durable ? 1 : 0);
result = 31 * result + (temporary ? 1 : 0);
result = 31 * result + (autoCreated ? 1 : 0);
result = 31 * result + maxConsumers;
result = 31 * result + (deleteOnNoConsumers ? 1 : 0);
return result;
}
@Override
public String toString() {
return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}';
return "QueueConfig{"
+ "id=" + id
+ ", address=" + address
+ ", name=" + name
+ ", filter=" + filter
+ ", pageSubscription=" + pageSubscription
+ ", user=" + user
+ ", durable=" + durable
+ ", temporary=" + temporary
+ ", autoCreated=" + autoCreated
+ ", maxConsumers=" + maxConsumers
+ ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}';
}
}

View File

@ -185,6 +185,14 @@ public interface ServerSession extends SecurityAuth {
boolean isClosed();
Queue createQueue(SimpleString address,
SimpleString name,
SimpleString filterString,
boolean temporary,
boolean durable,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
void createSharedQueue(SimpleString address,
SimpleString name,
boolean durable,

View File

@ -1499,6 +1499,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false);
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers);
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
@ -1509,6 +1520,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false);
}
@Override
public Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers);
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
@ -1520,6 +1543,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
}
@Override
public Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@Override
public void createSharedQueue(final SimpleString address,
final SimpleString name,
@ -1579,6 +1615,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null);
}
@Override
public Queue deployQueue(final SimpleString address,
final SimpleString resourceName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
if (resourceName.toString().toLowerCase().startsWith("jms.topic")) {
ActiveMQServerLogger.LOGGER.deployTopic(resourceName);
@ -1586,7 +1634,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
}
return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated);
return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@Override
@ -2171,9 +2219,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false);
deployQueue(SimpleString.toSimpleString(config.getAddress()),
SimpleString.toSimpleString(config.getName()),
SimpleString.toSimpleString(config.getFilterString()),
config.isDurable(),
false,
false,
config.getMaxConsumers(),
config.getDeleteOnNoConsumers());
}
}
private void deployQueuesFromConfiguration() throws Exception {
deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations());
}
@ -2297,6 +2353,31 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated) throws Exception {
return createQueue(addressName,
queueName,
filterString,
user,
durable,
temporary,
ignoreIfExists,
transientQueue,
autoCreated,
null,
null);
}
@Override
public Queue createQueue(final SimpleString addressName,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
@ -2313,22 +2394,40 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder;
final SimpleString address;
if (addressName == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
address = queueName;
} else {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
address = addressName;
}
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
AddressInfo defaultAddressInfo = new AddressInfo(address);
// FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API.
AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo);
boolean addressExists = true;
if (info == null) {
info = defaultAddressInfo;
addressExists = false;
}
final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
final QueueConfig queueConfig = queueConfigBuilder.filter(filter)
.pagingManager(pagingManager)
.user(user)
.durable(durable)
.temporary(temporary)
.autoCreated(autoCreated)
.deleteOnNoConsumers(isDeleteOnNoConsumers)
.maxConsumers(noMaxConsumers)
.build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
boolean addressAlreadyExists = true;
if (postOffice.getAddressInfo(queue.getAddress()) == null) {
postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
.setRoutingType(AddressInfo.RoutingType.MULTICAST));
addressAlreadyExists = false;
}
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
@ -2338,10 +2437,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
if (!addressAlreadyExists) {
storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
if (!addressExists) {
storageManager.addAddressBinding(txID, getAddressInfo(address));
}
storageManager.addQueueBinding(txID, localQueueBinding);
}
try {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
public class AddressInfo {
@ -24,9 +25,9 @@ public class AddressInfo {
private RoutingType routingType = RoutingType.MULTICAST;
private boolean defaultDeleteOnNoConsumers;
private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
private int defaultMaxConsumers;
private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
public AddressInfo(SimpleString name) {
this.name = name;

View File

@ -56,12 +56,14 @@ public class LastValueQueue extends QueueImpl {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
new Exception("LastValueQeue " + this).toString();
}

View File

@ -143,7 +143,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
} else {
queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress());
}
queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated());
queueConfigBuilder.filter(filter).pagingManager(pagingManager)
.user(queueBindingInfo.getUser())
.durable(true)
.temporary(false)
.autoCreated(queueBindingInfo.isAutoCreated())
.deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers());
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));

View File

@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
return queue;
}
@ -101,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}

View File

@ -238,6 +238,14 @@ public class QueueImpl implements Queue {
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
private int maxConsumers;
private boolean deleteOnNoConsumers;
private final AddressInfo addressInfo;
private final AtomicInteger noConsumers = new AtomicInteger(0);
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@ -334,10 +342,32 @@ public class QueueImpl implements Queue {
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
}
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
final Filter filter,
final PageSubscription pageSubscription,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
this.id = id;
this.address = address;
this.addressInfo = postOffice.getAddressInfo(address);
this.name = name;
this.filter = filter;
@ -350,6 +380,10 @@ public class QueueImpl implements Queue {
this.autoCreated = autoCreated;
this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers;
this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
this.postOffice = postOffice;
this.storageManager = storageManager;
@ -436,6 +470,16 @@ public class QueueImpl implements Queue {
return autoCreated;
}
@Override
public boolean isDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
@Override
public int getMaxConsumers() {
return maxConsumers;
}
@Override
public SimpleString getName() {
return name;
@ -709,6 +753,11 @@ public class QueueImpl implements Queue {
}
synchronized (this) {
if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
flushDeliveriesInTransit();
consumersChanged = true;
@ -722,6 +771,8 @@ public class QueueImpl implements Queue {
if (refCountForConsumers != null) {
refCountForConsumers.increment();
}
noConsumers.incrementAndGet();
}
}
@ -770,6 +821,14 @@ public class QueueImpl implements Queue {
if (refCountForConsumers != null) {
refCountForConsumers.decrement();
}
if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) {
try {
deleteQueue();
} catch (Exception e) {
logger.error("Error deleting queue on no consumers. " + this.toString(), e);
}
}
}
}
@ -1361,6 +1420,7 @@ public class QueueImpl implements Queue {
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
synchronized (this) {
if (this.queueDestroyed) return;
this.queueDestroyed = true;
}

View File

@ -205,6 +205,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {

View File

@ -501,6 +501,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString filterString,
final boolean temporary,
final boolean durable) throws Exception {
return createQueue(address, name, filterString, temporary, durable, null, null);
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final boolean temporary,
final boolean durable,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
if (durable) {
// make sure the user has privileges to create this queue
securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
@ -514,9 +525,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// any non-temporary JMS destination created via this method should be marked as auto-created
if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC))) {
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true, maxConsumers, deleteOnNoConsumers);
} else {
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers);
}
if (temporary) {

View File

@ -900,6 +900,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return false;
}
@Override
public boolean isDeleteOnNoConsumers() {
return false;
}
@Override
public int getMaxConsumers() {
return -1;
}
@Override
public void addConsumer(Consumer consumer) throws Exception {

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.addressing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -156,7 +158,7 @@ public class AddressingTest extends ActiveMQTestBase {
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientConsumer consumer3 = session.createConsumer(q3.getName());
List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3}));
List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[]{consumer1, consumer2, consumer3}));
List<String> messages = new ArrayList<>();
messages.add("Message1");
@ -187,8 +189,6 @@ public class AddressingTest extends ActiveMQTestBase {
assertEquals(0, count);
}
@Test
public void testMulticastRoutingBackwardsCompat() throws Exception {
@ -222,34 +222,102 @@ public class AddressingTest extends ActiveMQTestBase {
}
}
@Ignore
@Test
public void testDeleteQueueOnNoConsumersTrue() {
fail("Not Implemented");
public void testDeleteQueueOnNoConsumersTrue() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = true;
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
consumer1.close();
assertFalse(server.queueQuery(queueName).isExists());
}
@Test
public void testDeleteQueueOnNoConsumersFalse() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
consumer1.close();
assertTrue(server.queueQuery(queueName).isExists());
}
@Test
public void testLimitOnMaxConsumers() throws Exception {
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers);
Exception expectedException = null;
String expectedMessage = "Maximum Consumer Limit Reached on Queue";
try {
ClientSession session = sessionFactory.createSession();
session.start();
session.createConsumer(q1.getName());
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
expectedException = e;
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains(expectedMessage));
assertTrue(expectedException.getMessage().contains(address));
assertTrue(expectedException.getMessage().contains(queueName));
}
@Ignore
@Test
public void testDeleteQueueOnNoConsumersFalse() {
fail("Not Implemented");
public void testUnlimitedMaxConsumers() throws Exception {
int noConsumers = 50;
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers);
ClientSession session = sessionFactory.createSession();
session.start();
for (int i = 0; i < noConsumers; i++) {
session.createConsumer(q1.getName());
}
}
@Ignore
@Test
public void testLimitOnMaxConsumers() {
fail("Not Implemented");
}
public void testDefaultMaxConsumersFromAddress() throws Exception {
int noConsumers = 50;
SimpleString address = new SimpleString("test.address");
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false;
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.setDefaultMaxConsumers(0);
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
@Ignore
@Test
public void testUnlimitedMaxConsumers() {
fail("Not Implemented");
}
ClientSession session = sessionFactory.createSession();
session.start();
@Ignore
@Test
public void testDefaultMaxConsumersFromAddress() {
fail("Not Implemented");
for (int i = 0; i < noConsumers; i++) {
session.createConsumer(q1.getName());
}
}
@Ignore

View File

@ -224,12 +224,14 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
}
@Override
@ -256,7 +258,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
@Override
public Queue createQueueWith(final QueueConfig config) {
queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
return queue;
}
@ -271,7 +273,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
return queue;
}

View File

@ -442,6 +442,16 @@ public class FakeQueue implements Queue {
return false;
}
@Override
public boolean isDeleteOnNoConsumers() {
return false;
}
@Override
public int getMaxConsumers() {
return -1;
}
@Override
public LinkedListIterator<MessageReference> iterator() {
// no-op