diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 752574ae51..785dac375b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -213,7 +213,25 @@ public enum ActiveMQExceptionType { } }, - NOT_IMPLEMTNED_EXCEPTION(213); + NOT_IMPLEMTNED_EXCEPTION(213), + MAX_CONSUMER_LIMIT_EXCEEDED(214) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQQueueMaxConsumerLimitReached(msg); + } + }, + UNEXPECTED_ROUTING_TYPE_FOR_ADDRESS(215) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQUnexpectedRoutingTypeForAddress(msg); + } + }, + INVALID_QUEUE_CONFIGURATION(216) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQInvalidQueueConfiguration(msg); + } + }; private static final Map TYPE_MAP; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java new file mode 100644 index 0000000000..521a266263 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQInvalidQueueConfiguration extends ActiveMQException { + + public ActiveMQInvalidQueueConfiguration() { + super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION); + } + + public ActiveMQInvalidQueueConfiguration(String msg) { + super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION, msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java new file mode 100644 index 0000000000..0577e08c97 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQQueueMaxConsumerLimitReached extends ActiveMQException { + + public ActiveMQQueueMaxConsumerLimitReached() { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED); + } + + public ActiveMQQueueMaxConsumerLimitReached(String msg) { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java new file mode 100644 index 0000000000..1bd7ecd402 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQUnexpectedRoutingTypeForAddress extends ActiveMQException { + + public ActiveMQUnexpectedRoutingTypeForAddress() { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED); + } + + public ActiveMQUnexpectedRoutingTypeForAddress(String msg) { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg); + } +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 1a5b2b9cbc..db615f82d0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -48,8 +48,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -866,7 +866,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { - return (sync && callback == null) ? new SimpleFuture<>() : null; + return (sync && callback == null) ? new SimpleFuture() : null; } @Override @@ -2226,7 +2226,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory); + threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue(), factory); ioExecutorFactory = new OrderedExecutorFactory(threadPool); } else { ioExecutorFactory = providedIOThreadPool; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index b0e8b9b97b..fcbf15ca3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -573,7 +573,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); clearIO(); try { - server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 8c80a8a520..4d435c610f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -46,4 +46,11 @@ public interface QueueBindingInfo { List getQueueStatusEncodings(); + int getMaxConsumers(); + + void setMaxConsumers(int maxConsumers); + + boolean isDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index af32dda4d3..af506c13cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1266,6 +1266,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } } + @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 039460cfb1..169cd7df85 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -41,6 +41,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public List queueStatusEncodings; + public int maxConsumers; + + public boolean deleteOnNoConsumers; + public PersistentQueueBindingEncoding() { } @@ -57,6 +61,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin user + ", autoCreated=" + autoCreated + + ", maxConsumers=" + + maxConsumers + + ", deleteOnNoConsumers=" + + deleteOnNoConsumers + "]"; } @@ -124,6 +132,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin return queueStatusEncodings; } + @Override + public int getMaxConsumers() { + return 0; + } + + @Override + public void setMaxConsumers(int maxConsumers) { + + } + + @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public void setDeleteOnNoConsumers() { + + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); @@ -144,6 +172,14 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } autoCreated = buffer.readBoolean(); + + if (buffer.readableBytes() > 0) { + maxConsumers = buffer.readInt(); + deleteOnNoConsumers = buffer.readBoolean(); + } else { + maxConsumers = -1; + deleteOnNoConsumers = false; + } } @Override @@ -153,13 +189,17 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeNullableSimpleString(filterString); buffer.writeNullableSimpleString(createMetadata()); buffer.writeBoolean(autoCreated); + buffer.writeInt(maxConsumers); + buffer.writeBoolean(deleteOnNoConsumers); } @Override public int getEncodeSize() { return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN + - SimpleString.sizeofNullableString(createMetadata()); + SimpleString.sizeofNullableString(createMetadata()) + + DataConstants.SIZE_INT + + DataConstants.SIZE_BOOLEAN; } private SimpleString createMetadata() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index b52534cb37..2a45f29618 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -494,6 +495,13 @@ public class ServerSessionPacketHandler implements ChannelHandler { } else { ActiveMQServerLogger.LOGGER.caughtXaException(e); } + } catch (ActiveMQQueueMaxConsumerLimitReached e) { + if (requiresResponse) { + logger.debug("Sending exception to client", e); + response = new ActiveMQExceptionMessage(e); + } else { + ActiveMQServerLogger.LOGGER.caughtException(e); + } } catch (ActiveMQException e) { if (requiresResponse) { logger.debug("Sending exception to client", e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index f22873bc6b..6d8cf309aa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -29,16 +29,20 @@ import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; +import org.apache.activemq.artemis.api.core.ActiveMQInvalidQueueConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException; +import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; @@ -378,4 +382,12 @@ public interface ActiveMQMessageBundle { @Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.") ActiveMQIOErrorException diskBeyondLimit(); + @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT) + ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName); + + @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType); + + @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index f0e2336145..00b04c31b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -307,6 +307,14 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean durable, boolean temporary) throws Exception; + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable, + boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, @@ -314,6 +322,15 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean durable, boolean temporary) throws Exception; + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, @@ -322,6 +339,16 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean temporary, boolean autoCreated) throws Exception; + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + Queue deployQueue(SimpleString address, SimpleString queueName, SimpleString filterString, @@ -341,6 +368,15 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; + Queue deployQueue(SimpleString address, + SimpleString resourceName, + SimpleString filterString, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception; @@ -392,10 +428,22 @@ public interface ActiveMQServer extends ActiveMQComponent { AddressInfo getAddressInfo(SimpleString address); + Queue createQueue(SimpleString addressName, + SimpleString queueName, + SimpleString filterString, + SimpleString user, + boolean durable, + boolean temporary, + boolean ignoreIfExists, + boolean transientQueue, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + /* - * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will - * replace any factories with the same protocol - * */ + * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will + * replace any factories with the same protocol + * */ void addProtocolManagerFactory(ProtocolManagerFactory factory); /* diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 52cd2f04c4..2b845d5860 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -46,6 +46,10 @@ public interface Queue extends Bindable { boolean isAutoCreated(); + boolean isDeleteOnNoConsumers(); + + int getMaxConsumers(); + void addConsumer(Consumer consumer) throws Exception; void removeConsumer(Consumer consumer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index f750f6ca10..3b7ed710b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -33,6 +33,8 @@ public final class QueueConfig { private final boolean durable; private final boolean temporary; private final boolean autoCreated; + private final int maxConsumers; + private final boolean deleteOnNoConsumers; public static final class Builder { @@ -45,6 +47,8 @@ public final class QueueConfig { private boolean durable; private boolean temporary; private boolean autoCreated; + private int maxConsumers; + private boolean deleteOnNoConsumers; private Builder(final long id, final SimpleString name) { this(id, name, name); @@ -60,6 +64,8 @@ public final class QueueConfig { this.durable = true; this.temporary = false; this.autoCreated = true; + this.maxConsumers = -1; + this.deleteOnNoConsumers = false; validateState(); } @@ -106,6 +112,16 @@ public final class QueueConfig { return this; } + public Builder maxConsumers(final int maxConsumers) { + this.maxConsumers = maxConsumers; + return this; + } + + public Builder deleteOnNoConsumers(final boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + return this; + } + /** * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}. *
@@ -127,7 +143,7 @@ public final class QueueConfig { } else { pageSubscription = null; } - return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated); + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); } } @@ -168,7 +184,9 @@ public final class QueueConfig { final SimpleString user, final boolean durable, final boolean temporary, - final boolean autoCreated) { + final boolean autoCreated, + final int maxConsumers, + final boolean deleteOnNoConsumers) { this.id = id; this.address = address; this.name = name; @@ -178,6 +196,8 @@ public final class QueueConfig { this.durable = durable; this.temporary = temporary; this.autoCreated = autoCreated; + this.deleteOnNoConsumers = deleteOnNoConsumers; + this.maxConsumers = maxConsumers; } public long id() { @@ -216,6 +236,14 @@ public final class QueueConfig { return autoCreated; } + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public int maxConsumers() { + return maxConsumers; + } + @Override public boolean equals(Object o) { if (this == o) @@ -241,6 +269,10 @@ public final class QueueConfig { return false; if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null) return false; + if (maxConsumers != that.maxConsumers) + return false; + if (deleteOnNoConsumers != that.deleteOnNoConsumers) + return false; return user != null ? user.equals(that.user) : that.user == null; } @@ -256,11 +288,24 @@ public final class QueueConfig { result = 31 * result + (durable ? 1 : 0); result = 31 * result + (temporary ? 1 : 0); result = 31 * result + (autoCreated ? 1 : 0); + result = 31 * result + maxConsumers; + result = 31 * result + (deleteOnNoConsumers ? 1 : 0); return result; } @Override public String toString() { - return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}'; + return "QueueConfig{" + + "id=" + id + + ", address=" + address + + ", name=" + name + + ", filter=" + filter + + ", pageSubscription=" + pageSubscription + + ", user=" + user + + ", durable=" + durable + + ", temporary=" + temporary + + ", autoCreated=" + autoCreated + + ", maxConsumers=" + maxConsumers + + ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}'; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 0df506047d..ab3898c282 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -185,6 +185,14 @@ public interface ServerSession extends SecurityAuth { boolean isClosed(); + Queue createQueue(SimpleString address, + SimpleString name, + SimpleString filterString, + boolean temporary, + boolean durable, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + void createSharedQueue(SimpleString address, SimpleString name, boolean durable, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7ad4635694..3578ce3f5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1499,6 +1499,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false); } + @Override + public Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + } + @Override public Queue createQueue(final SimpleString address, final SimpleString queueName, @@ -1509,6 +1520,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false); } + @Override + public Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + } + @Override public Queue createQueue(final SimpleString address, final SimpleString queueName, @@ -1520,6 +1543,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated); } + @Override + public Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers); + } + @Override public void createSharedQueue(final SimpleString address, final SimpleString name, @@ -1579,6 +1615,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { + return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null); + } + + @Override + public Queue deployQueue(final SimpleString address, + final SimpleString resourceName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { if (resourceName.toString().toLowerCase().startsWith("jms.topic")) { ActiveMQServerLogger.LOGGER.deployTopic(resourceName); @@ -1586,7 +1634,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { ActiveMQServerLogger.LOGGER.deployQueue(resourceName); } - return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated); + return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); } @Override @@ -2171,9 +2219,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployQueuesFromListCoreQueueConfiguration(List queues) throws Exception { for (CoreQueueConfiguration config : queues) { - deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false); + deployQueue(SimpleString.toSimpleString(config.getAddress()), + SimpleString.toSimpleString(config.getName()), + SimpleString.toSimpleString(config.getFilterString()), + config.isDurable(), + false, + false, + config.getMaxConsumers(), + config.getDeleteOnNoConsumers()); } } + private void deployQueuesFromConfiguration() throws Exception { deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations()); } @@ -2297,6 +2353,31 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated) throws Exception { + return createQueue(addressName, + queueName, + filterString, + user, + durable, + temporary, + ignoreIfExists, + transientQueue, + autoCreated, + null, + null); + } + + @Override + public Queue createQueue(final SimpleString addressName, + final SimpleString queueName, + final SimpleString filterString, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean ignoreIfExists, + final boolean transientQueue, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { @@ -2313,22 +2394,40 @@ public class ActiveMQServerImpl implements ActiveMQServer { final long queueID = storageManager.generateID(); final QueueConfig.Builder queueConfigBuilder; + final SimpleString address; if (addressName == null) { queueConfigBuilder = QueueConfig.builderWith(queueID, queueName); + address = queueName; } else { queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName); + address = addressName; } - final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); + + + AddressInfo defaultAddressInfo = new AddressInfo(address); + // FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API. + AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo); + + boolean addressExists = true; + if (info == null) { + info = defaultAddressInfo; + addressExists = false; + } + + final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; + final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers; + + final QueueConfig queueConfig = queueConfigBuilder.filter(filter) + .pagingManager(pagingManager) + .user(user) + .durable(durable) + .temporary(temporary) + .autoCreated(autoCreated) + .deleteOnNoConsumers(isDeleteOnNoConsumers) + .maxConsumers(noMaxConsumers) + .build(); final Queue queue = queueFactory.createQueueWith(queueConfig); - boolean addressAlreadyExists = true; - - if (postOffice.getAddressInfo(queue.getAddress()) == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress()) - .setRoutingType(AddressInfo.RoutingType.MULTICAST)); - addressAlreadyExists = false; - } - if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } else if (queue.isAutoCreated()) { @@ -2338,10 +2437,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); if (queue.isDurable()) { - storageManager.addQueueBinding(txID, localQueueBinding); - if (!addressAlreadyExists) { - storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress())); + if (!addressExists) { + storageManager.addAddressBinding(txID, getAddressInfo(address)); } + storageManager.addQueueBinding(txID, localQueueBinding); } try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 4e982c419b..7c71c1fa27 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; public class AddressInfo { @@ -24,9 +25,9 @@ public class AddressInfo { private RoutingType routingType = RoutingType.MULTICAST; - private boolean defaultDeleteOnNoConsumers; + private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); - private int defaultMaxConsumers; + private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); public AddressInfo(SimpleString name) { this.name = name; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 453f588fee..a4fa5dc0af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -56,12 +56,14 @@ public class LastValueQueue extends QueueImpl { final boolean durable, final boolean temporary, final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, final Executor executor) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); new Exception("LastValueQeue " + this).toString(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 4e89e8a4c8..6f4cf03440 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -143,7 +143,13 @@ public class PostOfficeJournalLoader implements JournalLoader { } else { queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress()); } - queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated()); + queueConfigBuilder.filter(filter).pagingManager(pagingManager) + .user(queueBindingInfo.getUser()) + .durable(true) + .temporary(false) + .autoCreated(queueBindingInfo.isAutoCreated()) + .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers()) + .maxConsumers(queueBindingInfo.getMaxConsumers()); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 367855392d..bcc7c79ead 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory { final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } return queue; } @@ -101,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e01c81e2de..7e68382b13 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -238,6 +238,14 @@ public class QueueImpl implements Queue { private SlowConsumerReaperRunnable slowConsumerReaperRunnable; + private int maxConsumers; + + private boolean deleteOnNoConsumers; + + private final AddressInfo addressInfo; + + private final AtomicInteger noConsumers = new AtomicInteger(0); + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -334,10 +342,32 @@ public class QueueImpl implements Queue { final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, final Executor executor) { + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + } + + public QueueImpl(final long id, + final SimpleString address, + final SimpleString name, + final Filter filter, + final PageSubscription pageSubscription, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, + final ScheduledExecutorService scheduledExecutor, + final PostOffice postOffice, + final StorageManager storageManager, + final HierarchicalRepository addressSettingsRepository, + final Executor executor) { + this.id = id; this.address = address; + this.addressInfo = postOffice.getAddressInfo(address); + this.name = name; this.filter = filter; @@ -350,6 +380,10 @@ public class QueueImpl implements Queue { this.autoCreated = autoCreated; + this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers; + + this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; + this.postOffice = postOffice; this.storageManager = storageManager; @@ -436,6 +470,16 @@ public class QueueImpl implements Queue { return autoCreated; } + @Override + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + @Override + public int getMaxConsumers() { + return maxConsumers; + } + @Override public SimpleString getName() { return name; @@ -709,6 +753,11 @@ public class QueueImpl implements Queue { } synchronized (this) { + + if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) { + throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); + } + flushDeliveriesInTransit(); consumersChanged = true; @@ -722,6 +771,8 @@ public class QueueImpl implements Queue { if (refCountForConsumers != null) { refCountForConsumers.increment(); } + + noConsumers.incrementAndGet(); } } @@ -770,6 +821,14 @@ public class QueueImpl implements Queue { if (refCountForConsumers != null) { refCountForConsumers.decrement(); } + + if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) { + try { + deleteQueue(); + } catch (Exception e) { + logger.error("Error deleting queue on no consumers. " + this.toString(), e); + } + } } } @@ -1361,6 +1420,7 @@ public class QueueImpl implements Queue { @Override public void deleteQueue(boolean removeConsumers) throws Exception { synchronized (this) { + if (this.queueDestroyed) return; this.queueDestroyed = true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 98a9c8480a..389b07efdc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -205,6 +205,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.creationTime = System.currentTimeMillis(); + if (browseOnly) { browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 627b20190e..83193292d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -501,6 +501,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { + return createQueue(address, name, filterString, temporary, durable, null, null); + } + + @Override + public Queue createQueue(final SimpleString address, + final SimpleString name, + final SimpleString filterString, + final boolean temporary, + final boolean durable, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { if (durable) { // make sure the user has privileges to create this queue securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this); @@ -514,9 +525,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { // any non-temporary JMS destination created via this method should be marked as auto-created if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC))) { - queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true); + queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true, maxConsumers, deleteOnNoConsumers); } else { - queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary); + queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers); } if (temporary) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 55a287ad86..11b11ab2f6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -900,6 +900,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { return false; } + @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public int getMaxConsumers() { + return -1; + } + @Override public void addConsumer(Consumer consumer) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 03739e9ac9..c2004e76d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.addressing; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -156,7 +158,7 @@ public class AddressingTest extends ActiveMQTestBase { ClientConsumer consumer1 = session.createConsumer(q1.getName()); ClientConsumer consumer2 = session.createConsumer(q2.getName()); ClientConsumer consumer3 = session.createConsumer(q3.getName()); - List consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3})); + List consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[]{consumer1, consumer2, consumer3})); List messages = new ArrayList<>(); messages.add("Message1"); @@ -187,8 +189,6 @@ public class AddressingTest extends ActiveMQTestBase { assertEquals(0, count); } - - @Test public void testMulticastRoutingBackwardsCompat() throws Exception { @@ -222,34 +222,102 @@ public class AddressingTest extends ActiveMQTestBase { } } - @Ignore @Test - public void testDeleteQueueOnNoConsumersTrue() { - fail("Not Implemented"); + public void testDeleteQueueOnNoConsumersTrue() throws Exception { + + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = true; + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + consumer1.close(); + + assertFalse(server.queueQuery(queueName).isExists()); + } + + @Test + public void testDeleteQueueOnNoConsumersFalse() throws Exception { + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + consumer1.close(); + + assertTrue(server.queueQuery(queueName).isExists()); + } + + @Test + public void testLimitOnMaxConsumers() throws Exception { + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers); + + Exception expectedException = null; + String expectedMessage = "Maximum Consumer Limit Reached on Queue"; + try { + ClientSession session = sessionFactory.createSession(); + session.start(); + + session.createConsumer(q1.getName()); + } catch (ActiveMQQueueMaxConsumerLimitReached e) { + expectedException = e; + } + + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains(expectedMessage)); + assertTrue(expectedException.getMessage().contains(address)); + assertTrue(expectedException.getMessage().contains(queueName)); } @Ignore @Test - public void testDeleteQueueOnNoConsumersFalse() { - fail("Not Implemented"); + public void testUnlimitedMaxConsumers() throws Exception { + int noConsumers = 50; + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + for (int i = 0; i < noConsumers; i++) { + session.createConsumer(q1.getName()); + } } @Ignore @Test - public void testLimitOnMaxConsumers() { - fail("Not Implemented"); - } + public void testDefaultMaxConsumersFromAddress() throws Exception { + int noConsumers = 50; + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.setDefaultMaxConsumers(0); + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); - @Ignore - @Test - public void testUnlimitedMaxConsumers() { - fail("Not Implemented"); - } + ClientSession session = sessionFactory.createSession(); + session.start(); - @Ignore - @Test - public void testDefaultMaxConsumersFromAddress() { - fail("Not Implemented"); + for (int i = 0; i < noConsumers; i++) { + session.createConsumer(q1.getName()); + } } @Ignore diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 2fd5915549..124ece37d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -224,12 +224,14 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, final Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); } @Override @@ -256,7 +258,7 @@ public class HangConsumerTest extends ActiveMQTestBase { @Override public Queue createQueueWith(final QueueConfig config) { - queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); return queue; } @@ -271,7 +273,7 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated) { - queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); return queue; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 9a20d70b64..ef5c05ecfb 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -442,6 +442,16 @@ public class FakeQueue implements Queue { return false; } + @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public int getMaxConsumers() { + return -1; + } + @Override public LinkedListIterator iterator() { // no-op