From e293d80f08de58e4ce2bafd9d63c1a4185e079a9 Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 6 Jan 2015 13:34:38 -0600 Subject: [PATCH] ACTIVEMQ6-13 auto-create/auto-delete jms queues Implements a new feature for the broker whereby it may automatically create and delete JMS queues which are not explicitly defined through the management API or file-based configuration. A JMS queue is created in response to a sent message or connected consumer. The queue may subsequently be deleted when it no longer has any messages and consumers. Auto-creation and auto-deletion can both be turned on/off via address-setting. --- .../api/core/client/ClientSession.java | 12 + .../management/ActiveMQServerControl.java | 4 +- .../core/management/AddressSettingsInfo.java | 24 +- .../core/client/impl/AddressQueryImpl.java | 11 +- .../core/client/impl/QueueQueryImpl.java | 20 + .../core/impl/ActiveMQSessionContext.java | 16 +- .../core/protocol/core/impl/ChannelImpl.java | 4 + .../protocol/core/impl/PacketDecoder.java | 14 + .../core/protocol/core/impl/PacketImpl.java | 4 + .../SessionBindingQueryResponseMessage.java | 9 +- ...SessionBindingQueryResponseMessage_V2.java | 90 ++++ .../SessionQueueQueryResponseMessage.java | 21 +- .../SessionQueueQueryResponseMessage_V2.java | 130 ++++++ .../core/server/QueueQueryResult.java | 22 +- .../resources/activemq-version.properties | 2 +- .../jms/client/ActiveMQMessageProducer.java | 9 +- .../activemq/jms/client/ActiveMQSession.java | 37 +- .../impl/FileConfigurationParser.java | 12 + .../impl/ActiveMQServerControlImpl.java | 8 +- .../core/persistence/QueueBindingInfo.java | 2 + .../impl/journal/JournalStorageManager.java | 20 +- .../core/ServerSessionPacketHandler.java | 30 +- .../activemq/core/server/ActiveMQServer.java | 7 + .../core/server/ActiveMQServerLogger.java | 4 + .../core/server/AutoCreatedQueueManager.java | 29 ++ .../core/server/BindingQueryResult.java | 11 +- .../apache/activemq/core/server/Queue.java | 4 +- .../activemq/core/server/QueueFactory.java | 3 +- .../core/server/impl/ActiveMQServerImpl.java | 27 +- .../impl/AutoCreatedQueueManagerImpl.java | 93 +++++ .../core/server/impl/LastValueQueue.java | 2 + .../server/impl/PostOfficeJournalLoader.java | 9 +- .../core/server/impl/QueueFactoryImpl.java | 5 +- .../activemq/core/server/impl/QueueImpl.java | 17 +- .../core/server/impl/ServerSessionImpl.java | 30 +- .../core/settings/impl/AddressSettings.java | 70 +++- .../schema/activemq-configuration.xsd | 17 + .../config/impl/FileConfigurationTest.java | 4 + .../impl/ScheduledDeliveryHandlerTest.java | 9 +- .../core/settings/AddressSettingsTest.java | 3 +- .../ConfigurationTest-full-config.xml | 4 + docs/user-manual/en/queue-attributes.md | 12 + pom.xml | 2 +- .../concurrent/server/impl/QueueTest.java | 1 + .../client/AutoCreateJmsQueueTest.java | 187 +++++++++ .../client/AutoDeleteJmsQueueTest.java | 84 ++++ .../integration/client/HangConsumerTest.java | 8 +- .../client/InterruptedLargeMessageTest.java | 6 +- .../integration/client/PagingOrderTest.java | 8 +- .../jms/client/TopicCleanupTest.java | 2 +- .../jms/jms2client/NonExistentQueueTest.java | 2 +- .../management/ActiveMQServerControlTest.java | 22 +- .../ActiveMQServerControlUsingCoreTest.java | 8 +- .../persistence/ExportFormatTest.java | 60 +-- .../activemq/jms/tests/SessionTest.java | 9 + .../core/server/impl/QueueImplTest.java | 3 + .../unit/core/postoffice/impl/FakeQueue.java | 9 +- .../unit/core/server/impl/QueueImplTest.java | 389 ++++-------------- .../server/impl/fakes/FakeQueueFactory.java | 4 +- 59 files changed, 1228 insertions(+), 437 deletions(-) create mode 100644 activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java create mode 100644 activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java create mode 100644 activemq-server/src/main/java/org/apache/activemq/core/server/AutoCreatedQueueManager.java create mode 100644 activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoDeleteJmsQueueTest.java diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java index 11c579157e..75116fc38a 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java @@ -48,6 +48,12 @@ public interface ClientSession extends XAResource, AutoCloseable * Returns the names of the queues bound to the binding. */ List getQueueNames(); + + /** + * Returns true if auto-creation for this address is enabled and if the address queried is for a JMS + * queue, false else. + */ + boolean isAutoCreateJmsQueues(); } /** @@ -81,6 +87,12 @@ public interface ClientSession extends XAResource, AutoCloseable */ boolean isDurable(); + /** + * Returns true if auto-creation for this queue is enabled and if the queue queried is a JMS queue, + * false else. + */ + boolean isAutoCreateJmsQueues(); + /** * Returns the number of consumers attached to the queue. */ diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/ActiveMQServerControl.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/ActiveMQServerControl.java index f33b109d79..6c2b64e658 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/ActiveMQServerControl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/ActiveMQServerControl.java @@ -565,7 +565,9 @@ public interface ActiveMQServerControl @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, - @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception; + @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy, + @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues, + @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception; void removeAddressSettings(String addressMatch) throws Exception; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/AddressSettingsInfo.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/AddressSettingsInfo.java index c60d46a6b3..bc7f82acee 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/AddressSettingsInfo.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/management/AddressSettingsInfo.java @@ -60,6 +60,10 @@ public final class AddressSettingsInfo private final String slowConsumerPolicy; + private final boolean autoCreateJmsQueues; + + private final boolean autoDeleteJmsQueues; + // Static -------------------------------------------------------- public static AddressSettingsInfo from(final String jsonString) throws Exception @@ -80,7 +84,9 @@ public final class AddressSettingsInfo object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), - object.getString("slowConsumerPolicy")); + object.getString("slowConsumerPolicy"), + object.getBoolean("autoCreateJmsQueues"), + object.getBoolean("autoDeleteJmsQueues")); } // Constructors -------------------------------------------------- @@ -100,7 +106,9 @@ public final class AddressSettingsInfo boolean sendToDLAOnNoRoute, long slowConsumerThreshold, long slowConsumerCheckPeriod, - String slowConsumerPolicy) + String slowConsumerPolicy, + boolean autoCreateJmsQueues, + boolean autoDeleteJmsQueues) { this.addressFullMessagePolicy = addressFullMessagePolicy; this.maxSizeBytes = maxSizeBytes; @@ -118,6 +126,8 @@ public final class AddressSettingsInfo this.slowConsumerThreshold = slowConsumerThreshold; this.slowConsumerCheckPeriod = slowConsumerCheckPeriod; this.slowConsumerPolicy = slowConsumerPolicy; + this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoDeleteJmsQueues = autoDeleteJmsQueues; } // Public -------------------------------------------------------- @@ -206,5 +216,15 @@ public final class AddressSettingsInfo { return slowConsumerPolicy; } + + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } + + public boolean isAutoDeleteJmsQueues() + { + return autoDeleteJmsQueues; + } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/AddressQueryImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/AddressQueryImpl.java index 53ed6c7e6b..6c7c8a3a05 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/AddressQueryImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/AddressQueryImpl.java @@ -24,15 +24,17 @@ import org.apache.activemq.api.core.client.ClientSession; public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSession.BindingQuery { - private final boolean exists; private final ArrayList queueNames; - public AddressQueryImpl(final boolean exists, final List queueNames) + private final boolean autoCreateJmsQueues; + + public AddressQueryImpl(final boolean exists, final List queueNames, final boolean autoCreateJmsQueues) { this.exists = exists; this.queueNames = new ArrayList(queueNames); + this.autoCreateJmsQueues = autoCreateJmsQueues; } public List getQueueNames() @@ -44,4 +46,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSessi { return exists; } + + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/QueueQueryImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/QueueQueryImpl.java index 5d948d81d7..a330bfb2f5 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/QueueQueryImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/QueueQueryImpl.java @@ -38,6 +38,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery private final SimpleString name; + private final boolean autoCreateJmsQueues; + public QueueQueryImpl(final boolean durable, final boolean temporary, final int consumerCount, @@ -47,7 +49,19 @@ public class QueueQueryImpl implements ClientSession.QueueQuery final SimpleString name, final boolean exists) { + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, false); + } + public QueueQueryImpl(final boolean durable, + final boolean temporary, + final int consumerCount, + final long messageCount, + final SimpleString filterString, + final SimpleString address, + final SimpleString name, + final boolean exists, + final boolean autoCreateJmsQueues) + { this.durable = durable; this.temporary = temporary; this.consumerCount = consumerCount; @@ -56,6 +70,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery this.address = address; this.name = name; this.exists = exists; + this.autoCreateJmsQueues = autoCreateJmsQueues; } public SimpleString getName() @@ -88,6 +103,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery return durable; } + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } + public boolean isTemporary() { return temporary; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java index 4519b76ef0..17632c4088 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -60,7 +60,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; @@ -72,7 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualA import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage; @@ -231,13 +231,11 @@ public class ActiveMQSessionContext extends SessionContext public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); - SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); return response.toQueueQuery(); - } - public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, Executor executor, Executor flowControlExecutor) throws ActiveMQException @@ -252,7 +250,7 @@ public class ActiveMQSessionContext extends SessionContext browseOnly, true); - SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); // The actual windows size that gets used is determined by the user since // could be overridden on the queue settings @@ -283,10 +281,10 @@ public class ActiveMQSessionContext extends SessionContext public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - SessionBindingQueryResponseMessage response = - (SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + SessionBindingQueryResponseMessage_V2 response = + (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); - return new AddressQueryImpl(response.isExists(), response.getQueueNames()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues()); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java index 71e7ced21f..579ff23674 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java @@ -161,6 +161,10 @@ public final class ChannelImpl implements Channel return version >= 125; case PacketImpl.DISCONNECT_V2: return version >= 125; + case PacketImpl.SESS_QUEUEQUERY_RESP_V2: + return version >= 126; + case PacketImpl.SESS_BINDINGQUERY_RESP_V2: + return version >= 126; default: return true; } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketDecoder.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketDecoder.java index 44745730b9..cbb94c3004 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketDecoder.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketDecoder.java @@ -39,6 +39,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_ME import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CLOSE; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_COMMIT; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; @@ -52,6 +53,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUC import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; @@ -107,6 +109,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; @@ -120,6 +123,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCre import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; @@ -251,6 +255,11 @@ public abstract class PacketDecoder implements Serializable packet = new SessionQueueQueryResponseMessage(); break; } + case SESS_QUEUEQUERY_RESP_V2: + { + packet = new SessionQueueQueryResponseMessage_V2(); + break; + } case CREATE_QUEUE: { packet = new CreateQueueMessage(); @@ -276,6 +285,11 @@ public abstract class PacketDecoder implements Serializable packet = new SessionBindingQueryResponseMessage(); break; } + case SESS_BINDINGQUERY_RESP_V2: + { + packet = new SessionBindingQueryResponseMessage_V2(); + break; + } case SESS_XA_START: { packet = new SessionXAStartMessage(); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java index 71e6415c5a..e2bdc28a56 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java @@ -246,6 +246,10 @@ public class PacketImpl implements Packet public static final byte SCALEDOWN_ANNOUNCEMENT = -6; + public static final byte SESS_QUEUEQUERY_RESP_V2 = -7; + + public static final byte SESS_BINDINGQUERY_RESP_V2 = -8; + // Static -------------------------------------------------------- public PacketImpl(final byte type) diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java index 2f56abd236..dc56506d64 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java @@ -32,9 +32,9 @@ import org.apache.activemq.core.protocol.core.impl.PacketImpl; */ public class SessionBindingQueryResponseMessage extends PacketImpl { - private boolean exists; + protected boolean exists; - private List queueNames; + protected List queueNames; public SessionBindingQueryResponseMessage(final boolean exists, final List queueNames) { @@ -50,6 +50,11 @@ public class SessionBindingQueryResponseMessage extends PacketImpl super(SESS_BINDINGQUERY_RESP); } + public SessionBindingQueryResponseMessage(byte v2) + { + super(v2); + } + @Override public boolean isResponse() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java new file mode 100644 index 0000000000..2a9e440770 --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.protocol.core.impl.wireformat; + +import java.util.List; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.SimpleString; + +/** + * @author Justin Bertram + * + */ +public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage +{ + private boolean autoCreateJmsQueues; + + public SessionBindingQueryResponseMessage_V2(final boolean exists, final List queueNames, final boolean autoCreateJmsQueues) + { + super(SESS_BINDINGQUERY_RESP_V2); + + this.exists = exists; + + this.queueNames = queueNames; + + this.autoCreateJmsQueues = autoCreateJmsQueues; + } + + public SessionBindingQueryResponseMessage_V2() + { + super(SESS_BINDINGQUERY_RESP_V2); + } + + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) + { + super.encodeRest(buffer); + buffer.writeBoolean(autoCreateJmsQueues); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) + { + super.decodeRest(buffer); + autoCreateJmsQueues = buffer.readBoolean(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCreateJmsQueues ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionBindingQueryResponseMessage_V2)) + return false; + SessionBindingQueryResponseMessage_V2 other = (SessionBindingQueryResponseMessage_V2)obj; + if (autoCreateJmsQueues != other.autoCreateJmsQueues) + return false; + return true; + } +} diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java index 43ae4df566..d7d031a14f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java @@ -32,21 +32,21 @@ import org.apache.activemq.core.server.QueueQueryResult; */ public class SessionQueueQueryResponseMessage extends PacketImpl { - private SimpleString name; + protected SimpleString name; - private boolean exists; + protected boolean exists; - private boolean durable; + protected boolean durable; - private int consumerCount; + protected int consumerCount; - private long messageCount; + protected long messageCount; - private SimpleString filterString; + protected SimpleString filterString; - private SimpleString address; + protected SimpleString address; - private boolean temporary; + protected boolean temporary; public SessionQueueQueryResponseMessage(final QueueQueryResult result) { @@ -59,6 +59,11 @@ public class SessionQueueQueryResponseMessage extends PacketImpl this(null, null, false, false, null, 0, 0, false); } + public SessionQueueQueryResponseMessage(byte v2) + { + super(v2); + } + private SessionQueueQueryResponseMessage(final SimpleString name, final SimpleString address, final boolean durable, diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java new file mode 100644 index 0000000000..4c6b47439c --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V2.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.protocol.core.impl.wireformat; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.core.client.impl.QueueQueryImpl; +import org.apache.activemq.core.server.QueueQueryResult; + +/** + * @author Justin Bertram + * + */ +public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage +{ + private boolean autoCreationEnabled; + + public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result) + { + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), + result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues()); + } + + public SessionQueueQueryResponseMessage_V2() + { + this(null, null, false, false, null, 0, 0, false, false); + } + + private SessionQueueQueryResponseMessage_V2(final SimpleString name, + final SimpleString address, + final boolean durable, + final boolean temporary, + final SimpleString filterString, + final int consumerCount, + final long messageCount, + final boolean exists, + final boolean autoCreationEnabled) + { + super(SESS_QUEUEQUERY_RESP_V2); + + this.durable = durable; + + this.temporary = temporary; + + this.consumerCount = consumerCount; + + this.messageCount = messageCount; + + this.filterString = filterString; + + this.address = address; + + this.name = name; + + this.exists = exists; + + this.autoCreationEnabled = autoCreationEnabled; + } + + public boolean isAutoCreationEnabled() + { + return autoCreationEnabled; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) + { + super.encodeRest(buffer); + buffer.writeBoolean(autoCreationEnabled); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) + { + super.decodeRest(buffer); + autoCreationEnabled = buffer.readBoolean(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCreationEnabled ? 1231 : 1237); + return result; + } + + public ClientSession.QueueQuery toQueueQuery() + { + return new QueueQueryImpl(isDurable(), + isTemporary(), + getConsumerCount(), + getMessageCount(), + getFilterString(), + getAddress(), + getName(), + isExists(), + isAutoCreationEnabled()); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionQueueQueryResponseMessage_V2)) + return false; + SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2)obj; + if (autoCreationEnabled != other.autoCreationEnabled) + return false; + return true; + } +} diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/server/QueueQueryResult.java b/activemq-core-client/src/main/java/org/apache/activemq/core/server/QueueQueryResult.java index 9650fff490..3b7cbf6f1c 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/server/QueueQueryResult.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/server/QueueQueryResult.java @@ -43,29 +43,28 @@ public class QueueQueryResult private boolean temporary; + private boolean autoCreateJmsQueues; + public QueueQueryResult(final SimpleString name, final SimpleString address, final boolean durable, final boolean temporary, final SimpleString filterString, final int consumerCount, - final long messageCount) + final long messageCount, + final boolean autoCreateJmsQueues) { - this(name, address, durable, temporary, filterString, consumerCount, messageCount, true); + this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true); } - public QueueQueryResult() - { - this(null, null, false, false, null, 0, 0, false); - } - - private QueueQueryResult(final SimpleString name, + public QueueQueryResult(final SimpleString name, final SimpleString address, final boolean durable, final boolean temporary, final SimpleString filterString, final int consumerCount, final long messageCount, + final boolean autoCreateJmsQueues, final boolean exists) { this.durable = durable; @@ -82,6 +81,8 @@ public class QueueQueryResult this.name = name; + this.autoCreateJmsQueues = autoCreateJmsQueues; + this.exists = exists; } @@ -125,4 +126,9 @@ public class QueueQueryResult return temporary; } + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } + } diff --git a/activemq-core-client/src/main/resources/activemq-version.properties b/activemq-core-client/src/main/resources/activemq-version.properties index f09128221d..8dc05e58a4 100644 --- a/activemq-core-client/src/main/resources/activemq-version.properties +++ b/activemq-core-client/src/main/resources/activemq-version.properties @@ -21,4 +21,4 @@ activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionSuffix=${activemq.version.versionSuffix} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125 +activemq.version.compatibleVersionList=121,122,123,124,125,126 diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java index 167d601a97..772b83615d 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java @@ -415,7 +415,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To ClientSession.AddressQuery query = clientSession.addressQuery(address); if (!query.isExists()) { - throw new InvalidDestinationException("Destination " + address + " does not exist"); + if (query.isAutoCreateJmsQueues()) + { + clientSession.createQueue(address, address, true); + } + else + { + throw new InvalidDestinationException("Destination " + address + " does not exist"); + } } else { diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java index bb79174a47..7725ba81d4 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java @@ -326,7 +326,14 @@ public class ActiveMQSession implements QueueSession, TopicSession if (!response.isExists()) { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + if (response.isAutoCreateJmsQueues()) + { + session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), true); + } + else + { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + } } connection.addKnownDestination(jbd.getSimpleAddress()); @@ -730,7 +737,14 @@ public class ActiveMQSession implements QueueSession, TopicSession if (!response.isExists()) { - throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist"); + if (response.isAutoCreateJmsQueues()) + { + session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true); + } + else + { + throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); + } } connection.addKnownDestination(dest.getSimpleAddress()); @@ -902,10 +916,17 @@ public class ActiveMQSession implements QueueSession, TopicSession try { - AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress())); - if (!message.isExists()) + AddressQuery response = session.addressQuery(new SimpleString(jbq.getAddress())); + if (!response.isExists()) { - throw new InvalidDestinationException(jbq.getAddress() + " does not exist"); + if (response.isAutoCreateJmsQueues()) + { + session.createQueue(jbq.getSimpleAddress(), jbq.getSimpleAddress(), true); + } + else + { + throw new InvalidDestinationException("Destination " + jbq.getName() + " does not exist"); + } } } catch (ActiveMQException e) @@ -1239,13 +1260,13 @@ public class ActiveMQSession implements QueueSession, TopicSession QueueQuery response = session.queueQuery(queue.getSimpleAddress()); - if (response.isExists()) + if (!response.isExists() && !response.isAutoCreateJmsQueues()) { - return queue; + return null; } else { - return null; + return queue; } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java index be2b2de1a1..f396da3c3d 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java @@ -152,6 +152,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy"; + private static final String AUTO_CREATE_JMS_QUEUES = "auto-create-jms-queues"; + + private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues"; + // Attributes ---------------------------------------------------- private boolean validateAIO = false; @@ -1139,6 +1143,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, value); addressSettings.setSlowConsumerPolicy(policy); } + else if (AUTO_CREATE_JMS_QUEUES.equalsIgnoreCase(name)) + { + addressSettings.setAutoCreateJmsQueues(XMLUtil.parseBoolean(child)); + } + else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) + { + addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child)); + } } return setting; } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/management/impl/ActiveMQServerControlImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/management/impl/ActiveMQServerControlImpl.java index 16f9fa00bb..b899f092bc 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/management/impl/ActiveMQServerControlImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/management/impl/ActiveMQServerControlImpl.java @@ -1635,6 +1635,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active policy = addressSettings.getSlowConsumerPolicy() == SlowConsumerPolicy.NOTIFY ? "NOTIFY" : "KILL"; settings.put("slowConsumerPolicy", policy); + settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()); + settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()); JSONObject jsonObject = new JSONObject(settings); return jsonObject.toString(); @@ -1658,7 +1660,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active final String addressFullMessagePolicy, final long slowConsumerThreshold, final long slowConsumerCheckPeriod, - final String slowConsumerPolicy) throws Exception + final String slowConsumerPolicy, + final boolean autoCreateJmsQueues, + final boolean autoDeleteJmsQueues) throws Exception { checkStarted(); @@ -1721,6 +1725,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active { addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); } + addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues); + addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues); server.getAddressSettingsRepository().addMatch(address, addressSettings); storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings)); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/QueueBindingInfo.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/QueueBindingInfo.java index 69885b08c2..a4ecf2259d 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/QueueBindingInfo.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/QueueBindingInfo.java @@ -43,4 +43,6 @@ public interface QueueBindingInfo SimpleString getFilterString(); + boolean isAutoCreated(); + } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java index 204411831b..82b135c70b 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java @@ -2014,7 +2014,8 @@ public class JournalStorageManager implements StorageManager PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), - filterString); + filterString, + queue.isAutoCreated()); readLock(); try @@ -3027,6 +3028,8 @@ public class JournalStorageManager implements StorageManager public SimpleString filterString; + public boolean autoCreated; + public PersistentQueueBindingEncoding() { } @@ -3041,16 +3044,20 @@ public class JournalStorageManager implements StorageManager address + ", filterString=" + filterString + + ", autoCreated=" + + autoCreated + "]"; } public PersistentQueueBindingEncoding(final SimpleString name, final SimpleString address, - final SimpleString filterString) + final SimpleString filterString, + final boolean autoCreated) { this.name = name; this.address = address; this.filterString = filterString; + this.autoCreated = autoCreated; } public long getId() @@ -3083,11 +3090,17 @@ public class JournalStorageManager implements StorageManager return name; } + public boolean isAutoCreated() + { + return autoCreated; + } + public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); address = buffer.readSimpleString(); filterString = buffer.readNullableSimpleString(); + autoCreated = buffer.readBoolean(); } public void encode(final ActiveMQBuffer buffer) @@ -3095,12 +3108,13 @@ public class JournalStorageManager implements StorageManager buffer.writeSimpleString(name); buffer.writeSimpleString(address); buffer.writeNullableSimpleString(filterString); + buffer.writeBoolean(autoCreated); } public int getEncodeSize() { return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + - SimpleString.sizeofNullableString(filterString); + SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN; } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/ServerSessionPacketHandler.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/ServerSessionPacketHandler.java index 65db0866e8..529137a5ff 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/ServerSessionPacketHandler.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/ServerSessionPacketHandler.java @@ -72,6 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -81,6 +82,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionForceConsum import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage; @@ -230,7 +232,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover - response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName())); + QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) + { + response = new SessionQueueQueryResponseMessage_V2(queueQueryResult); + } + else + { + response = new SessionQueueQueryResponseMessage(queueQueryResult); + } } break; @@ -277,7 +287,14 @@ public class ServerSessionPacketHandler implements ChannelHandler requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet; QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); - response = new SessionQueueQueryResponseMessage(result); + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) + { + response = new SessionQueueQueryResponseMessage_V2(result); + } + else + { + response = new SessionQueueQueryResponseMessage(result); + } break; } case SESS_BINDINGQUERY: @@ -285,7 +302,14 @@ public class ServerSessionPacketHandler implements ChannelHandler requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet; BindingQueryResult result = session.executeBindingQuery(request.getAddress()); - response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames()); + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) + { + response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues()); + } + else + { + response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames()); + } break; } case SESS_ACKNOWLEDGE: diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java index 2866f2ef5e..2054be14bc 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java @@ -177,6 +177,13 @@ public interface ActiveMQServer extends ActiveMQComponent boolean durable, boolean temporary) throws Exception; + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + boolean durable, + boolean temporary, + boolean autoCreated) throws Exception; + Queue deployQueue(SimpleString address, SimpleString queueName, SimpleString filterString, diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServerLogger.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServerLogger.java index 469b75142c..7d6889dc60 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServerLogger.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServerLogger.java @@ -1366,4 +1366,8 @@ public interface ActiveMQServerLogger extends BasicLogger @Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy Configuration. Please use exclusively or remove. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT) void incompatibleWithHAPolicyChosen(String parameter); + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224065, value = "Failed to remove auto-created queue {0}", format = Message.Format.MESSAGE_FORMAT) + void errorRemovingAutoCreatedQueue(@Cause Exception e, SimpleString bindingName); + } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/AutoCreatedQueueManager.java b/activemq-server/src/main/java/org/apache/activemq/core/server/AutoCreatedQueueManager.java new file mode 100644 index 0000000000..03d4d8888d --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/AutoCreatedQueueManager.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.server; + +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.utils.ReferenceCounter; + +/** + * @author Clebert Suconic + */ + +public interface AutoCreatedQueueManager extends ReferenceCounter +{ + SimpleString getQueueName(); +} diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/BindingQueryResult.java b/activemq-server/src/main/java/org/apache/activemq/core/server/BindingQueryResult.java index 9d2255f84a..33339cfd4e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/BindingQueryResult.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/BindingQueryResult.java @@ -33,11 +33,15 @@ public class BindingQueryResult private List queueNames; - public BindingQueryResult(final boolean exists, final List queueNames) + private boolean autoCreateJmsQueues; + + public BindingQueryResult(final boolean exists, final List queueNames, final boolean autoCreateJmsQueues) { this.exists = exists; this.queueNames = queueNames; + + this.autoCreateJmsQueues = autoCreateJmsQueues; } public boolean isExists() @@ -45,6 +49,11 @@ public class BindingQueryResult return exists; } + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues; + } + public List getQueueNames() { return queueNames; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/Queue.java b/activemq-server/src/main/java/org/apache/activemq/core/server/Queue.java index 4759df0f71..e3d2d2c1c3 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/Queue.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/Queue.java @@ -49,6 +49,8 @@ public interface Queue extends Bindable boolean isTemporary(); + boolean isAutoCreated(); + void addConsumer(Consumer consumer) throws Exception; void removeConsumer(Consumer consumer); @@ -62,7 +64,7 @@ public interface Queue extends Bindable * on shared subscriptions where the queue needs to be deleted when all the * consumers are closed. */ - void setConsumersRefCount(ActiveMQServer server); + void setConsumersRefCount(ReferenceCounter referenceCounter); ReferenceCounter getConsumersRefCount(); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/QueueFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueFactory.java index 12cef0a29d..8e9e7ed0ce 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/QueueFactory.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueFactory.java @@ -39,7 +39,8 @@ public interface QueueFactory Filter filter, PageSubscription pageSubscription, boolean durable, - boolean temporary); + boolean temporary, + boolean autoCreated); /** * This is required for delete-all-reference to work correctly with paging diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index bba4729cf6..fd412d9af5 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -1193,9 +1193,18 @@ public class ActiveMQServerImpl implements ActiveMQServer final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, queueName, filterString, durable, temporary, false, false); + return createQueue(address, queueName, filterString, durable, temporary, false, false, false); } + public Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean autoCreated) throws Exception + { + return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated); + } /** * Creates a transient queue. A queue that will exist as long as there are consumers. @@ -1214,7 +1223,7 @@ public class ActiveMQServerImpl implements ActiveMQServer final SimpleString filterString, boolean durable) throws Exception { - Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable); + Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false); if (!queue.getAddress().equals(address)) { @@ -1263,7 +1272,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { ActiveMQServerLogger.LOGGER.deployQueue(queueName); - return createQueue(address, queueName, filterString, durable, temporary, true, false); + return createQueue(address, queueName, filterString, durable, temporary, true, false, false); } public void destroyQueue(final SimpleString queueName) throws Exception @@ -1981,7 +1990,8 @@ public class ActiveMQServerImpl implements ActiveMQServer final boolean durable, final boolean temporary, final boolean ignoreIfExists, - final boolean transientQueue) throws Exception + final boolean transientQueue, + final boolean autoCreated) throws Exception { QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); @@ -2021,11 +2031,16 @@ public class ActiveMQServerImpl implements ActiveMQServer filter, pageSubscription, durable, - temporary); + temporary, + autoCreated); if (transientQueue) { - queue.setConsumersRefCount(this); + queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName)); + } + else if (autoCreated) + { + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queueName)); } binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java new file mode 100644 index 0000000000..5bcd2c66b1 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.server.impl; + +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.ActiveMQServerLogger; +import org.apache.activemq.core.server.AutoCreatedQueueManager; +import org.apache.activemq.core.server.Queue; +import org.apache.activemq.utils.ReferenceCounterUtil; + +/** + * @author Clebert Suconic + */ + +public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager +{ + private final SimpleString queueName; + + private final ActiveMQServer server; + + private final Runnable runnable = new Runnable() + { + public void run() + { + try + { + Queue queue = server.locateQueue(queueName); + long consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); + + if (server.locateQueue(queueName).getMessageCount() == 0) + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount); + } + + server.destroyQueue(queueName, null, false); + } + else if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("NOT deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount); + } + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); + } + } + }; + + private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); + + public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) + { + this.server = server; + + this.queueName = queueName; + } + + @Override + public int increment() + { + return referenceCounterUtil.increment(); + } + + @Override + public int decrement() + { + return referenceCounterUtil.decrement(); + } + + @Override + public SimpleString getQueueName() + { + return queueName; + } +} diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LastValueQueue.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LastValueQueue.java index bd86f98b0d..c796d2d331 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LastValueQueue.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LastValueQueue.java @@ -55,6 +55,7 @@ public class LastValueQueue extends QueueImpl final PageSubscription pageSubscription, final boolean durable, final boolean temporary, + final boolean autoCreated, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -68,6 +69,7 @@ public class LastValueQueue extends QueueImpl pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/PostOfficeJournalLoader.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/PostOfficeJournalLoader.java index 7fcf169060..90a035ee7c 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/PostOfficeJournalLoader.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/PostOfficeJournalLoader.java @@ -46,6 +46,7 @@ import org.apache.activemq.core.postoffice.Binding; import org.apache.activemq.core.postoffice.DuplicateIDCache; import org.apache.activemq.core.postoffice.PostOffice; import org.apache.activemq.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.NodeManager; @@ -155,7 +156,13 @@ public class PostOfficeJournalLoader implements JournalLoader filter, subscription, true, - false); + false, + queueBindingInfo.isAutoCreated()); + + if (queueBindingInfo.isAutoCreated()) + { + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); + } Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId()); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueFactoryImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueFactoryImpl.java index ff5fdd87e2..104ce4ffcd 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueFactoryImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueFactoryImpl.java @@ -75,7 +75,8 @@ public class QueueFactoryImpl implements QueueFactory final Filter filter, final PageSubscription pageSubscription, final boolean durable, - final boolean temporary) + final boolean temporary, + final boolean autoCreated) { AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); @@ -89,6 +90,7 @@ public class QueueFactoryImpl implements QueueFactory pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, @@ -104,6 +106,7 @@ public class QueueFactoryImpl implements QueueFactory pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java index e127fe6037..90d424c15e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java @@ -61,7 +61,6 @@ import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.Consumer; import org.apache.activemq.core.server.HandleStatus; import org.apache.activemq.core.server.ActiveMQMessageBundle; -import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.RoutingContext; @@ -131,6 +130,8 @@ public class QueueImpl implements Queue private final boolean temporary; + private final boolean autoCreated; + private final PostOffice postOffice; private volatile boolean queueDestroyed = false; @@ -315,6 +316,7 @@ public class QueueImpl implements Queue final Filter filter, final boolean durable, final boolean temporary, + final boolean autoCreated, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -328,6 +330,7 @@ public class QueueImpl implements Queue null, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, @@ -342,6 +345,7 @@ public class QueueImpl implements Queue final PageSubscription pageSubscription, final boolean durable, final boolean temporary, + final boolean autoCreated, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -362,6 +366,8 @@ public class QueueImpl implements Queue this.temporary = temporary; + this.autoCreated = autoCreated; + this.postOffice = postOffice; this.storageManager = storageManager; @@ -425,11 +431,11 @@ public class QueueImpl implements Queue } // Queue implementation ---------------------------------------------------------------------------------------- - public synchronized void setConsumersRefCount(final ActiveMQServer server) + public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter) { if (refCountForConsumers == null) { - this.refCountForConsumers = new TransientQueueManagerImpl(server, this.name); + this.refCountForConsumers = referenceCounter; } } @@ -449,6 +455,11 @@ public class QueueImpl implements Queue return temporary; } + public boolean isAutoCreated() + { + return autoCreated; + } + public SimpleString getName() { return name; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java index 553747b836..379079584e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java @@ -37,6 +37,7 @@ import org.apache.activemq.api.core.Pair; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.management.CoreNotificationType; import org.apache.activemq.api.core.management.ManagementHelper; +import org.apache.activemq.api.core.management.ResourceNames; import org.apache.activemq.core.client.impl.ClientMessageImpl; import org.apache.activemq.core.exception.ActiveMQXAException; import org.apache.activemq.core.filter.Filter; @@ -549,7 +550,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); } - Queue queue = server.createQueue(address, name, filterString, durable, temporary); + // any non-temporary JMS queue created via this method should be marked as auto-created + if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) + { + server.createQueue(address, name, filterString, durable, temporary, true); + } + else + { + server.createQueue(address, name, filterString, durable, temporary); + } if (temporary) { @@ -676,6 +685,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception { + boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues(); + if (name == null) { throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); @@ -699,16 +710,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener queue.isTemporary(), filterString, queue.getConsumerCount(), - queue.getMessageCount()); + queue.getMessageCount(), + autoCreateJmsQueues); } // make an exception for the management address (see HORNETQ-29) else if (name.equals(managementAddress)) { - response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1); + response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues); + } + else if (autoCreateJmsQueues) + { + response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false); } else { - response = new QueueQueryResult(); + response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false); } return response; @@ -716,6 +732,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception { + boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues(); + if (address == null) { throw ActiveMQMessageBundle.BUNDLE.addressIsNull(); @@ -726,7 +744,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener // make an exception for the management address (see HORNETQ-29) if (address.equals(managementAddress)) { - return new BindingQueryResult(true, names); + return new BindingQueryResult(true, names, autoCreateJmsQueues); } Bindings bindings = postOffice.getMatchingBindings(address); @@ -739,7 +757,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener } } - return new BindingQueryResult(!names.isEmpty(), names); + return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues); } public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception diff --git a/activemq-server/src/main/java/org/apache/activemq/core/settings/impl/AddressSettings.java b/activemq-server/src/main/java/org/apache/activemq/core/settings/impl/AddressSettings.java index 0d3a32e4f1..5a8689b75e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/settings/impl/AddressSettings.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/settings/impl/AddressSettings.java @@ -56,6 +56,10 @@ public class AddressSettings implements Mergeable, Serializable public static final boolean DEFAULT_LAST_VALUE_QUEUE = false; + public static final boolean DEFAULT_AUTO_CREATE_QUEUES = true; + + public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true; + public static final long DEFAULT_REDISTRIBUTION_DELAY = -1; public static final long DEFAULT_EXPIRY_DELAY = -1; @@ -106,6 +110,10 @@ public class AddressSettings implements Mergeable, Serializable private SlowConsumerPolicy slowConsumerPolicy = null; + private Boolean autoCreateJmsQueues = null; + + private Boolean autoDeleteJmsQueues = null; + public AddressSettings(AddressSettings other) { this.addressFullMessagePolicy = other.addressFullMessagePolicy; @@ -127,6 +135,8 @@ public class AddressSettings implements Mergeable, Serializable this.slowConsumerThreshold = other.slowConsumerThreshold; this.slowConsumerCheckPeriod = other.slowConsumerCheckPeriod; this.slowConsumerPolicy = other.slowConsumerPolicy; + this.autoCreateJmsQueues = other.autoCreateJmsQueues; + this.autoDeleteJmsQueues = other.autoDeleteJmsQueues; } public AddressSettings() @@ -134,6 +144,26 @@ public class AddressSettings implements Mergeable, Serializable } + public boolean isAutoCreateJmsQueues() + { + return autoCreateJmsQueues != null ? autoCreateJmsQueues : AddressSettings.DEFAULT_AUTO_CREATE_QUEUES; + } + + public void setAutoCreateJmsQueues(final boolean autoCreateJmsQueues) + { + this.autoCreateJmsQueues = autoCreateJmsQueues; + } + + public boolean isAutoDeleteJmsQueues() + { + return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES; + } + + public void setAutoDeleteJmsQueues(final boolean autoDeleteJmsQueues) + { + this.autoDeleteJmsQueues = autoDeleteJmsQueues; + } + public boolean isLastValueQueue() { return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE; @@ -398,6 +428,14 @@ public class AddressSettings implements Mergeable, Serializable { slowConsumerPolicy = merged.slowConsumerPolicy; } + if (autoCreateJmsQueues == null) + { + autoCreateJmsQueues = merged.autoCreateJmsQueues; + } + if (autoDeleteJmsQueues == null) + { + autoDeleteJmsQueues = merged.autoDeleteJmsQueues; + } } @Override @@ -458,6 +496,10 @@ public class AddressSettings implements Mergeable, Serializable { slowConsumerPolicy = null; } + + autoCreateJmsQueues = BufferHelper.readNullableBoolean(buffer); + + autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer); } @Override @@ -482,7 +524,9 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) + BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) + BufferHelper.sizeOfNullableLong(slowConsumerThreshold) + - BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null); + BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) + + BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) + + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues); } @Override @@ -526,6 +570,10 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableLong(buffer, slowConsumerCheckPeriod); buffer.writeNullableSimpleString(slowConsumerPolicy != null ? new SimpleString(slowConsumerPolicy.toString()) : null); + + BufferHelper.writeNullableBoolean(buffer, autoCreateJmsQueues); + + BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues); } /* (non-Javadoc) @@ -556,6 +604,8 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((slowConsumerThreshold == null) ? 0 : slowConsumerThreshold.hashCode()); result = prime * result + ((slowConsumerCheckPeriod == null) ? 0 : slowConsumerCheckPeriod.hashCode()); result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode()); + result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode()); + result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode()); return result; } @@ -705,6 +755,20 @@ public class AddressSettings implements Mergeable, Serializable } else if (!slowConsumerPolicy.equals(other.slowConsumerPolicy)) return false; + if (autoCreateJmsQueues == null) + { + if (other.autoCreateJmsQueues != null) + return false; + } + else if (!autoCreateJmsQueues.equals(other.autoCreateJmsQueues)) + return false; + if (autoDeleteJmsQueues == null) + { + if (other.autoDeleteJmsQueues != null) + return false; + } + else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues)) + return false; return true; } @@ -751,6 +815,10 @@ public class AddressSettings implements Mergeable, Serializable slowConsumerCheckPeriod + ", slowConsumerPolicy=" + slowConsumerPolicy + + ", autoCreateJmsQueues=" + + autoCreateJmsQueues + + ", autoDeleteJmsQueues=" + + autoDeleteJmsQueues + "]"; } } diff --git a/activemq-server/src/main/resources/schema/activemq-configuration.xsd b/activemq-server/src/main/resources/schema/activemq-configuration.xsd index 7ca0468d32..bc6bea3ed8 100644 --- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd +++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd @@ -2258,6 +2258,23 @@ + + + + + whether or not to automatically create JMS queues when a producer sends or a consumer connects to a + queue + + + + + + + + whether or not to delete auto-created JMS queues when the queue has 0 consumers and 0 messages + + + diff --git a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java index a892c03413..bf10266679 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java @@ -297,6 +297,8 @@ public class FileConfigurationTest extends ConfigurationImplTest assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold()); assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); + assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues()); + assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString()); @@ -308,6 +310,8 @@ public class FileConfigurationTest extends ConfigurationImplTest assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold()); assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod()); assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); + assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); + assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues()); assertEquals(2, conf.getQueueConfigurations().size()); diff --git a/activemq-server/src/test/java/org/apache/activemq/core/server/impl/ScheduledDeliveryHandlerTest.java b/activemq-server/src/test/java/org/apache/activemq/core/server/impl/ScheduledDeliveryHandlerTest.java index abb6e86d64..7b85924e42 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -42,7 +42,6 @@ import org.apache.activemq.core.message.BodyEncoder; import org.apache.activemq.core.paging.PagingStore; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.server.Consumer; -import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.RoutingContext; @@ -1053,6 +1052,12 @@ public class ScheduledDeliveryHandlerTest extends Assert return false; } + @Override + public boolean isAutoCreated() + { + return false; + } + @Override public void addConsumer(Consumer consumer) throws Exception { @@ -1072,7 +1077,7 @@ public class ScheduledDeliveryHandlerTest extends Assert } @Override - public void setConsumersRefCount(ActiveMQServer server) + public void setConsumersRefCount(ReferenceCounter referenceCounter) { } diff --git a/activemq-server/src/test/java/org/apache/activemq/core/settings/AddressSettingsTest.java b/activemq-server/src/test/java/org/apache/activemq/core/settings/AddressSettingsTest.java index 2d71668f27..42324203db 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/settings/AddressSettingsTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/settings/AddressSettingsTest.java @@ -46,7 +46,8 @@ public class AddressSettingsTest extends UnitTestCase Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); - + Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues()); + Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues()); } @Test diff --git a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml index 9180f93fab..3f2ab446fc 100644 --- a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml @@ -271,6 +271,8 @@ 10 5 NOTIFY + true + true a2.1 @@ -283,6 +285,8 @@ 20 15 KILL + false + false diff --git a/docs/user-manual/en/queue-attributes.md b/docs/user-manual/en/queue-attributes.md index d53467350e..9892e69991 100644 --- a/docs/user-manual/en/queue-attributes.md +++ b/docs/user-manual/en/queue-attributes.md @@ -95,6 +95,8 @@ entry that would be found in the `activemq-configuration.xml` file. -1 NOTIFY 5 + true + true @@ -176,3 +178,13 @@ on this notification. `slow-consumer-check-period`. How often to check for slow consumers on a particular queue. Measured in minutes. Default is 5. See ? for more information about slow consumer detection. + +`auto-create-jms-queues`. Whether or not the broker should automatically +create a JMS queue when a JMS message is sent to a queue whose name fits +the address `match` (remember, a JMS queue is just a core queue which has +the same address and queue name) or a JMS consumer tries to connect to a +queue whose name fits the address `match`. Queues which are auto-created +are durable, non-temporary, and non-transient. + +`auto-delete-jms-queues`. Whether or not to the broker should automatically +delete auto-created JMS queues when they have both 0 consumers and 0 messages. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4f3f440150..0f861f7805 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 6 0 0 - 125,124,123,122 + 126,125,124,123,122 SNAPSHOT SNAPSHOT diff --git a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/server/impl/QueueTest.java b/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/server/impl/QueueTest.java index 5ba28b5ee4..f0039ff0c8 100644 --- a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/server/impl/QueueTest.java +++ b/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/server/impl/QueueTest.java @@ -77,6 +77,7 @@ public class QueueTest extends UnitTestCase null, null, false, + false, false); FakeConsumer consumer = new FakeConsumer(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java new file mode 100644 index 0000000000..f76d631675 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.jms.ActiveMQJMSClient; +import org.apache.activemq.core.security.Role; +import org.apache.activemq.core.server.Queue; +import org.apache.activemq.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Justin Bertram + */ +public class AutoCreateJmsQueueTest extends JMSTestBase +{ + @Test + public void testAutoCreateOnSendToQueue() throws Exception + { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test"); + + MessageProducer producer = session.createProducer(queue); + + final int numMessages = 100; + + for (int i = 0; i < numMessages; i++) + { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + + producer.close(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < numMessages; i++) + { + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + } + + connection.close(); + } + + @Test + public void testAutoCreateOnSendToQueueSecurity() throws Exception + { + server.getSecurityManager().addUser("guest", "guest"); + server.getSecurityManager().setDefaultUser("guest"); + server.getSecurityManager().addRole("guest", "rejectAll"); + Role role = new Role("rejectAll", false, false, false, false, false, false, false); + Set roles = new HashSet(); + roles.add(role); + server.getSecurityRepository().addMatch("#", roles); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test"); + + try + { + MessageProducer producer = session.createProducer(queue); + Assert.fail("Creating a producer here should throw a JMSSecurityException"); + } + catch (Exception e) + { + Assert.assertTrue(e instanceof JMSSecurityException); + } + + connection.close(); + } + + @Test + public void testAutoCreateOnSendToTopic() throws Exception + { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); + + try + { + MessageProducer producer = session.createProducer(topic); + Assert.fail("Creating a producer here should throw an exception"); + } + catch (Exception e) + { + Assert.assertTrue(e instanceof InvalidDestinationException); + } + + connection.close(); + } + + @Test + public void testAutoCreateOnConsumeFromQueue() throws Exception + { + Connection connection = null; + connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test"); + + MessageConsumer messageConsumer = session.createConsumer(queue); + connection.start(); + + Message m = messageConsumer.receive(500); + Assert.assertNull(m); + + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable(); + Assert.assertEquals(0, q.getMessageCount()); + Assert.assertEquals(0, q.getMessagesAdded()); + connection.close(); + } + + @Test + public void testAutoCreateOnConsumeFromTopic() throws Exception + { + Connection connection = null; + connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); + + try + { + MessageConsumer messageConsumer = session.createConsumer(topic); + Assert.fail("Creating a consumer here should throw an exception"); + } + catch (Exception e) + { + Assert.assertTrue(e instanceof InvalidDestinationException); + } + + connection.close(); + } + + @Before + @Override + public void setUp() throws Exception + { + super.setUp(); + server.getSecurityManager().addUser("guest", "guest"); + server.getSecurityManager().setDefaultUser("guest"); + server.getSecurityManager().addRole("guest", "allowAll"); + Role role = new Role("allowAll", true, true, true, true, true, true, true); + Set roles = new HashSet(); + roles.add(role); + server.getSecurityRepository().addMatch("#", roles); + } + + protected boolean useSecurity() + { + return true; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoDeleteJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoDeleteJmsQueueTest.java new file mode 100644 index 0000000000..14cacf77a7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoDeleteJmsQueueTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.jms.ActiveMQJMSClient; +import org.apache.activemq.core.server.Queue; +import org.apache.activemq.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Justin Bertram + */ +public class AutoDeleteJmsQueueTest extends JMSTestBase +{ + @Test + public void testAutoDelete() throws Exception + { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test"); + + MessageProducer producer = session.createProducer(queue); + + final int numMessages = 100; + + for (int i = 0; i < numMessages; i++) + { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + + producer.close(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < numMessages - 1; i++) + { + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + } + + session.close(); + + // ensure the queue is still there + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable(); + Assert.assertEquals(1, q.getMessageCount()); + Assert.assertEquals(numMessages, q.getMessagesAdded()); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageConsumer = session.createConsumer(queue); + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + + connection.close(); + + // ensure the queue was removed + Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test"))); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java index b4a79774de..e953e83c5e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java @@ -242,6 +242,7 @@ public class HangConsumerTest extends ServiceTestBase final PageSubscription pageSubscription, final boolean durable, final boolean temporary, + final boolean autoCreated, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -255,6 +256,7 @@ public class HangConsumerTest extends ServiceTestBase pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, @@ -294,7 +296,8 @@ public class HangConsumerTest extends ServiceTestBase final Filter filter, final PageSubscription pageSubscription, final boolean durable, - final boolean temporary) + final boolean temporary, + final boolean autoCreated) { queue = new MyQueueWithBlocking(persistenceID, address, @@ -303,6 +306,7 @@ public class HangConsumerTest extends ServiceTestBase pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, @@ -401,7 +405,7 @@ public class HangConsumerTest extends ServiceTestBase // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/InterruptedLargeMessageTest.java index 9db1ff1867..0fb5ab2586 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/InterruptedLargeMessageTest.java @@ -506,6 +506,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase PageSubscription pageSubscription, boolean durable, boolean temporary, + boolean autoCreated, ScheduledExecutorService scheduledExecutor, PostOffice postOffice, StorageManager storageManager, @@ -519,6 +520,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, @@ -570,7 +572,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase Filter filter, PageSubscription pageSubscription, boolean durable, - boolean temporary) + boolean temporary, + boolean autoCreated) { return new NoPostACKQueue(persistenceID, @@ -580,6 +583,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase pageSubscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, storageManager, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingOrderTest.java index a77ee945ba..fd2591f179 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingOrderTest.java @@ -749,7 +749,9 @@ public class PagingOrderTest extends ServiceTestBase "PAGE", -1, 10, - "KILL"); + "KILL", + true, + true); ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); @@ -824,7 +826,9 @@ public class PagingOrderTest extends ServiceTestBase "PAGE", -1, 10, - "KILL"); + "KILL", + true, + true); jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/client/TopicCleanupTest.java index bd264ec6d9..55cd607e9f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/client/TopicCleanupTest.java @@ -82,7 +82,7 @@ public class TopicCleanupTest extends JMSTestBase { long txid = storage.generateID(); - final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, server.getScheduledPool(), server.getPostOffice(), + final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/jms2client/NonExistentQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/jms2client/NonExistentQueueTest.java index 1f3db36f20..e632d65020 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/jms2client/NonExistentQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/jms2client/NonExistentQueueTest.java @@ -62,7 +62,7 @@ public class NonExistentQueueTest extends JMSTestBase @Test public void sendToNonExistantDestination() throws Exception { - Destination destination = ActiveMQJMSClient.createQueue("DoesNotExist"); + Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist"); TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName()); ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlTest.java index a795e35242..3016d055fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlTest.java @@ -507,6 +507,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase long slowConsumerThreshold = 5; long slowConsumerCheckPeriod = 10; String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString(); + boolean autoCreateJmsQueues = false; + boolean autoDeleteJmsQueues = false; serverControl.addAddressSettings(addressMatch, DLA, @@ -525,7 +527,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, - slowConsumerPolicy); + slowConsumerPolicy, + autoCreateJmsQueues, + autoDeleteJmsQueues); boolean ex = false; @@ -548,7 +552,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, - slowConsumerPolicy); + slowConsumerPolicy, + autoCreateJmsQueues, + autoDeleteJmsQueues); } catch (Exception expected) { @@ -578,6 +584,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); + assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues()); + assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues()); serverControl.addAddressSettings(addressMatch, DLA, @@ -596,7 +604,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, - slowConsumerPolicy); + slowConsumerPolicy, + autoCreateJmsQueues, + autoDeleteJmsQueues); jsonString = serverControl.getAddressSettingsAsJSON(exactAddress); @@ -618,6 +628,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); + assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues()); + assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues()); ex = false; @@ -640,7 +652,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, - slowConsumerPolicy); + slowConsumerPolicy, + autoCreateJmsQueues, + autoDeleteJmsQueues); } catch (Exception e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index cd54124a82..019689fc8d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -582,7 +582,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, - @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception + @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy, + @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues, + @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception { proxy.invokeOperation("addAddressSettings", addressMatch, @@ -602,7 +604,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, - slowConsumerPolicy); + slowConsumerPolicy, + autoCreateJmsQueues, + autoDeleteJmsQueues); } public void removeAddressSettings(String addressMatch) throws Exception diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java index b905e46c10..fbd9e746c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.tests.integration.persistence; +import org.junit.Ignore; import org.junit.Test; import java.io.StringReader; @@ -47,37 +48,40 @@ public class ExportFormatTest extends ServiceTestBase // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field String bindingsFile = "#File,JournalFileImpl: (activemq-bindings-1.bindings id = 1, recordID = 1)\n" + - "operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" + - "operation@AddRecord,id@2,userRecordType@21,length@17,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAA=\n" + - "operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" + - "#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)"; + "operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" + + "operation@AddRecordTX,txID@2,id@3,userRecordType@21,length@18,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAAA\n" + + "operation@Commit,txID@2,numberOfRecords@1\n" + + "operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" + + "#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)"; // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field String journalFile = "#File,JournalFileImpl: (activemq-data-1.amq id = 1, recordID = 1)\n" + - "operation@AddRecordTX,txID@0,id@4,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP40EAQAAAAEAAAAGawBlAHkABgAAAAA=\n" + - "operation@UpdateTX,txID@0,id@4,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" + - "operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" + - "operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" + - "operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" + - "operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@Commit,txID@0,numberOfRecords@10\n" + - "operation@AddRecord,id@12,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6gEAQAAAAEAAAAGawBlAHkABgAAAAU=\n" + - "operation@Update,id@12,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6oEAQAAAAEAAAAGawBlAHkABgAAAAY=\n" + - "operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6sEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" + - "operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP60EAQAAAAEAAAAGawBlAHkABgAAAAg=\n" + - "operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP64EAQAAAAEAAAAGawBlAHkABgAAAAk=\n" + - "operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + - "#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)"; + "operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfYEAQAAAAEAAAAGawBlAHkABgAAAAA=\n" + + "operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" + + "operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" + + "operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" + + "operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecordTX,txID@0,id@9,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" + + "operation@UpdateTX,txID@0,id@9,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@Commit,txID@0,numberOfRecords@10\n" + + "operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg0EAQAAAAEAAAAGawBlAHkABgAAAAU=\n" + + "operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg8EAQAAAAEAAAAGawBlAHkABgAAAAY=\n" + + "operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChMEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" + + "operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChcEAQAAAAEAAAAGawBlAHkABgAAAAg=\n" + + "operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "operation@AddRecord,id@17,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChoEAQAAAAEAAAAGawBlAHkABgAAAAk=\n" + + "operation@Update,id@17,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" + + "#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)"; - public void _testCreateFormat() throws Exception + @Test + @Ignore // use this to recreate the format above. Notice we can't change the record format between releases + public void testCreateFormat() throws Exception { ActiveMQServer server = createServer(true); server.start(); @@ -86,7 +90,7 @@ public class ExportFormatTest extends ServiceTestBase ClientSessionFactory factory = createSessionFactory(locator); ClientSession session = factory.createSession(false, false, false); - session.createQueue("A1", "A1"); + session.createQueue("A1", "A1", true); ClientProducer producer = session.createProducer("A1"); for (int i = 0; i < 5; i++) diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/SessionTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/SessionTest.java index c062fe8339..5c686cbbdf 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/SessionTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/SessionTest.java @@ -34,6 +34,7 @@ import javax.jms.XAConnection; import javax.jms.XASession; import org.apache.activemq.api.jms.JMSFactoryType; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.jms.tests.util.ProxyAssertSupport; import org.junit.Test; @@ -113,6 +114,10 @@ public class SessionTest extends ActiveMQServerTestCase @Test public void testCreateNonExistentQueue() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings); + Connection conn = getConnectionFactory().createConnection(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); try @@ -147,6 +152,10 @@ public class SessionTest extends ActiveMQServerTestCase @Test public void testCreateQueueWhileTopicWithSameNameExists() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoCreateJmsQueues(false); + getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings); + Connection conn = getConnectionFactory().createConnection(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); try diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/core/server/impl/QueueImplTest.java index c2fb1ac8d0..e8bbf9513d 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/tests/timing/core/server/impl/QueueImplTest.java @@ -81,6 +81,7 @@ public class QueueImplTest extends UnitTestCase null, false, true, + false, scheduledExecutor, null, null, @@ -158,6 +159,7 @@ public class QueueImplTest extends UnitTestCase null, false, true, + false, scheduledExecutor, null, null, @@ -273,6 +275,7 @@ public class QueueImplTest extends UnitTestCase null, false, true, + false, scheduledExecutor, null, null, diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/postoffice/impl/FakeQueue.java index b88ee6ea6e..1be23eaa93 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/postoffice/impl/FakeQueue.java @@ -25,7 +25,6 @@ import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.filter.Filter; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.server.Consumer; -import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.RoutingContext; @@ -60,7 +59,7 @@ public class FakeQueue implements Queue } @Override - public void setConsumersRefCount(ActiveMQServer server) + public void setConsumersRefCount(ReferenceCounter referenceCounter) { } @@ -427,6 +426,12 @@ public class FakeQueue implements Queue return false; } + @Override + public boolean isAutoCreated() + { + return false; + } + @Override public LinkedListIterator iterator() { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java index f2e776d73f..a71e97f66e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java @@ -94,17 +94,7 @@ public class QueueImplTest extends UnitTestCase { final SimpleString name = new SimpleString("oobblle"); - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - name, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getNamedQueue(name); Assert.assertEquals(name, queue.getName()); } @@ -112,31 +102,11 @@ public class QueueImplTest extends UnitTestCase @Test public void testDurable() { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - false, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getNonDurableQueue(); Assert.assertFalse(queue.isDurable()); - queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - true, - false, - scheduledExecutor, - null, - null, - null, - executor); + queue = getDurableQueue(); Assert.assertTrue(queue.isDurable()); } @@ -150,17 +120,7 @@ public class QueueImplTest extends UnitTestCase Consumer cons3 = new FakeConsumer(); - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); Assert.assertEquals(0, queue.getConsumerCount()); @@ -202,17 +162,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testGetFilter() { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); Assert.assertNull(queue.getFilter()); @@ -229,17 +179,7 @@ public class QueueImplTest extends UnitTestCase } }; - queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - filter, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + queue = getFilteredQueue(filter); Assert.assertEquals(filter, queue.getFilter()); @@ -248,17 +188,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testSimpleadd() { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -278,17 +208,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testRate() throws InterruptedException { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -309,17 +229,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testSimpleNonDirectDelivery() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -358,17 +268,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testBusyConsumer() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); FakeConsumer consumer = new FakeConsumer(); @@ -413,17 +313,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testBusyConsumerThenAddMoreMessages() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); FakeConsumer consumer = new FakeConsumer(); @@ -491,17 +381,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testaddHeadadd() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -556,17 +436,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testChangeConsumersAndDeliver() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - new FakePostOffice(), - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -730,17 +600,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testRoundRobinWithQueueing() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -793,17 +653,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testWithPriorities() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; @@ -856,17 +706,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testConsumerWithFilterAddAndRemove() { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); Filter filter = new FakeFilter("fruit", "orange"); @@ -876,17 +716,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testIterator() { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 20; @@ -925,17 +755,7 @@ public class QueueImplTest extends UnitTestCase public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - new FakePostOffice(), - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); Filter filter = new FakeFilter("fruit", "orange"); @@ -1009,17 +829,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testBusyConsumerWithFilterFirstCallBusy() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'")); @@ -1061,17 +871,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'")); @@ -1146,17 +946,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testConsumerWithFilterThenAddMoreMessages() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); final int numMessages = 10; List refs = new ArrayList(); @@ -1220,17 +1010,7 @@ public class QueueImplTest extends UnitTestCase private void testConsumerWithFilters(final boolean direct) throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - new FakePostOffice(), - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); Filter filter = new FakeFilter("fruit", "orange"); @@ -1323,17 +1103,7 @@ public class QueueImplTest extends UnitTestCase public void testMessageOrder() throws Exception { FakeConsumer consumer = new FakeConsumer(); - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); @@ -1354,17 +1124,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testMessagesAdded() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); @@ -1377,17 +1137,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testGetReference() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); @@ -1401,17 +1151,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testGetNonExistentReference() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); @@ -1430,17 +1170,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testPauseAndResumeWithAsync() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); // pauses the queue queue.pause(); @@ -1498,17 +1228,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testPauseAndResumeWithDirect() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); // Now add a consumer FakeConsumer consumer = new FakeConsumer(); @@ -1553,17 +1273,7 @@ public class QueueImplTest extends UnitTestCase @Test public void testResetMessagesAdded() throws Exception { - QueueImpl queue = new QueueImpl(1, - QueueImplTest.address1, - QueueImplTest.queue1, - null, - false, - true, - scheduledExecutor, - null, - null, - null, - executor); + QueueImpl queue = getTemporaryQueue(); MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); queue.addTail(messageReference); @@ -1668,4 +1378,45 @@ public class QueueImplTest extends UnitTestCase server.stop(); } } + + private QueueImpl getNonDurableQueue() + { + return getQueue(QueueImplTest.queue1, false, false, null); + } + + private QueueImpl getDurableQueue() + { + return getQueue(QueueImplTest.queue1, true, false, null); + } + + private QueueImpl getNamedQueue(SimpleString name) + { + return getQueue(name, false, true, null); + } + + private QueueImpl getFilteredQueue(Filter filter) + { + return getQueue(QueueImplTest.queue1, false, true, filter); + } + + private QueueImpl getTemporaryQueue() + { + return getQueue(QueueImplTest.queue1, false, true, null); + } + + private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) + { + return new QueueImpl(1, + QueueImplTest.address1, + name, + filter, + durable, + temporary, + false, + scheduledExecutor, + new FakePostOffice(), + null, + null, + executor); + } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 3bc337d3a3..1b4da2cdda 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -49,7 +49,8 @@ public class FakeQueueFactory implements QueueFactory final Filter filter, final PageSubscription subscription, final boolean durable, - final boolean temporary) + final boolean temporary, + final boolean autoCreated) { return new QueueImpl(persistenceID, address, @@ -58,6 +59,7 @@ public class FakeQueueFactory implements QueueFactory subscription, durable, temporary, + autoCreated, scheduledExecutor, postOffice, null,