diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 40db6cf719..dd1c45cb56 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -64,6 +64,12 @@ public interface ClientSession extends XAResource, AutoCloseable { * queue, false else. */ boolean isAutoCreateJmsQueues(); + + /** + * Returns true if auto-creation for this address is enabled and if the address queried is for a JMS + * topic, false else. + */ + boolean isAutoCreateJmsTopics(); } /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 7784992472..677bfb6c04 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -656,7 +656,9 @@ public interface ActiveMQServerControl { @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy, @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues, - @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception; + @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues, + @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics, + @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception; void removeAddressSettings(String addressMatch) throws Exception; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java index 63b80cbfda..d4f5232ffb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressSettingsInfo.java @@ -57,11 +57,15 @@ public final class AddressSettingsInfo { private final boolean autoDeleteJmsQueues; + private final boolean autoCreateJmsTopics; + + private final boolean autoDeleteJmsTopics; + // Static -------------------------------------------------------- public static AddressSettingsInfo from(final String jsonString) throws Exception { JSONObject object = new JSONObject(jsonString); - return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues")); + return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"), object.getLong("maxSizeBytes"), object.getInt("pageSizeBytes"), object.getInt("pageCacheMaxSize"), object.getInt("maxDeliveryAttempts"), object.getLong("redeliveryDelay"), object.getDouble("redeliveryMultiplier"), object.getLong("maxRedeliveryDelay"), object.getString("DLA"), object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), object.getBoolean("sendToDLAOnNoRoute"), object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerCheckPeriod"), object.getString("slowConsumerPolicy"), object.getBoolean("autoCreateJmsQueues"), object.getBoolean("autoDeleteJmsQueues"), object.getBoolean("autoCreateJmsTopics"), object.getBoolean("autoDeleteJmsTopics")); } // Constructors -------------------------------------------------- @@ -83,7 +87,9 @@ public final class AddressSettingsInfo { long slowConsumerCheckPeriod, String slowConsumerPolicy, boolean autoCreateJmsQueues, - boolean autoDeleteJmsQueues) { + boolean autoDeleteJmsQueues, + boolean autoCreateJmsTopics, + boolean autoDeleteJmsTopics) { this.addressFullMessagePolicy = addressFullMessagePolicy; this.maxSizeBytes = maxSizeBytes; this.pageSizeBytes = pageSizeBytes; @@ -102,6 +108,8 @@ public final class AddressSettingsInfo { this.slowConsumerPolicy = slowConsumerPolicy; this.autoCreateJmsQueues = autoCreateJmsQueues; this.autoDeleteJmsQueues = autoDeleteJmsQueues; + this.autoCreateJmsTopics = autoCreateJmsTopics; + this.autoDeleteJmsTopics = autoDeleteJmsTopics; } // Public -------------------------------------------------------- @@ -181,5 +189,13 @@ public final class AddressSettingsInfo { public boolean isAutoDeleteJmsQueues() { return autoDeleteJmsQueues; } + + public boolean isAutoCreateJmsTopics() { + return autoCreateJmsTopics; + } + + public boolean isAutoDeleteJmsTopics() { + return autoDeleteJmsTopics; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java index 3a1f62bc6d..9d38dbb5c6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java @@ -30,12 +30,16 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { private final boolean autoCreateJmsQueues; + private final boolean autoCreateJmsTopics; + public AddressQueryImpl(final boolean exists, final List queueNames, - final boolean autoCreateJmsQueues) { + final boolean autoCreateJmsQueues, + final boolean autoCreateJmsTopics) { this.exists = exists; this.queueNames = new ArrayList<>(queueNames); this.autoCreateJmsQueues = autoCreateJmsQueues; + this.autoCreateJmsTopics = autoCreateJmsTopics; } @Override @@ -52,4 +56,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { public boolean isAutoCreateJmsQueues() { return autoCreateJmsQueues; } + + @Override + public boolean isAutoCreateJmsTopics() { + return autoCreateJmsTopics; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 794df524bc..9f0edceca5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -49,18 +49,18 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; @@ -283,9 +283,9 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); + SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3); - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics()); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index b4ac75df29..89fa4f1014 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -164,6 +164,8 @@ public final class ChannelImpl implements Channel { return version >= 126; case PacketImpl.SESS_BINDINGQUERY_RESP_V2: return version >= 126; + case PacketImpl.SESS_BINDINGQUERY_RESP_V3: + return version >= 127; default: return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 804a26a925..e04f3d0015 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -40,6 +40,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; @@ -110,6 +111,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAdd import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; @@ -257,6 +259,10 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionBindingQueryResponseMessage_V2(); break; } + case SESS_BINDINGQUERY_RESP_V3: { + packet = new SessionBindingQueryResponseMessage_V3(); + break; + } case SESS_XA_START: { packet = new SessionXAStartMessage(); break; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 9aa8d3c69e..ac1edf7f2a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -245,6 +245,8 @@ public class PacketImpl implements Packet { public static final byte REPLICATION_RESPONSE_V2 = -9; + public static final byte SESS_BINDINGQUERY_RESP_V3 = -10; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java index 6197faf0c3..410dac1dca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage.java @@ -44,8 +44,8 @@ public class SessionBindingQueryResponseMessage extends PacketImpl { super(SESS_BINDINGQUERY_RESP); } - public SessionBindingQueryResponseMessage(byte v2) { - super(v2); + public SessionBindingQueryResponseMessage(byte v) { + super(v); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java index 55da268585..0a487437e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V2.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage { - private boolean autoCreateJmsQueues; + protected boolean autoCreateJmsQueues; public SessionBindingQueryResponseMessage_V2(final boolean exists, final List queueNames, @@ -41,6 +41,10 @@ public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryRe super(SESS_BINDINGQUERY_RESP_V2); } + public SessionBindingQueryResponseMessage_V2(byte v) { + super(v); + } + public boolean isAutoCreateJmsQueues() { return autoCreateJmsQueues; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V3.java new file mode 100644 index 0000000000..3ed489f716 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V3.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; + +public class SessionBindingQueryResponseMessage_V3 extends SessionBindingQueryResponseMessage_V2 { + + private boolean autoCreateJmsTopics; + + public SessionBindingQueryResponseMessage_V3(final boolean exists, + final List queueNames, + final boolean autoCreateJmsQueues, + final boolean autoCreateJmsTopics) { + super(SESS_BINDINGQUERY_RESP_V3); + + this.exists = exists; + + this.queueNames = queueNames; + + this.autoCreateJmsQueues = autoCreateJmsQueues; + + this.autoCreateJmsTopics = autoCreateJmsTopics; + } + + public SessionBindingQueryResponseMessage_V3() { + super(SESS_BINDINGQUERY_RESP_V3); + } + + public boolean isAutoCreateJmsTopics() { + return autoCreateJmsTopics; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(autoCreateJmsTopics); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + autoCreateJmsTopics = buffer.readBoolean(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCreateJmsTopics ? 1231 : 1237); + return result; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", exists=" + exists); + buff.append(", queueNames=" + queueNames); + buff.append(", autoCreateJmsQueues=" + autoCreateJmsQueues); + buff.append(", autoCreateJmsTopics=" + autoCreateJmsTopics); + buff.append("]"); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionBindingQueryResponseMessage_V3)) + return false; + SessionBindingQueryResponseMessage_V3 other = (SessionBindingQueryResponseMessage_V3) obj; + if (autoCreateJmsTopics != other.autoCreateJmsTopics) + return false; + return true; + } +} diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index 4144bf050b..07685e1074 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127 diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 0f33c0499c..112b535c3f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -405,7 +405,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To // if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side // as that's a more efficient path for such operation - if (!query.isExists() && !query.isAutoCreateJmsQueues()) { + if (!query.isExists() && ((address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !query.isAutoCreateJmsQueues()) || (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !query.isAutoCreateJmsTopics()))) { throw new InvalidDestinationException("Destination " + address + " does not exist"); } else { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index da2dd8000a..ee81cf19c8 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -299,7 +299,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (jbd != null) { ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); - if (!response.isExists() && !response.isAutoCreateJmsQueues()) { + if (!response.isExists() && ((jbd.getAddress().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && !response.isAutoCreateJmsQueues()) || (jbd.getAddress().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && !response.isAutoCreateJmsTopics()))) { throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); } @@ -659,7 +659,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { else { AddressQuery response = session.addressQuery(dest.getSimpleAddress()); - if (!response.isExists()) { + if (!response.isExists() && !response.isAutoCreateJmsTopics()) { throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); } @@ -1106,7 +1106,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { AddressQuery query = session.addressQuery(topic.getSimpleAddress()); - if (!query.isExists()) { + if (!query.isExists() && !query.isAutoCreateJmsTopics()) { return null; } else { diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java index 879e7cb166..69314b68a1 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java @@ -76,6 +76,17 @@ public interface JMSServerManager extends ActiveMQComponent { */ boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception; + /** + * + * @param storeConfig + * @param topicName + * @param autoCreated + * @param bindings + * @return + * @throws Exception + */ + boolean createTopic(boolean storeConfig, String topicName, boolean autoCreated, String... bindings) throws Exception; + /** * Remove the topic from the Binding Registry or BindingRegistry. * Calling this method does not destroy the destination. diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index 85084c1de3..336c9d2a58 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -21,6 +21,7 @@ import javax.transaction.xa.Xid; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -48,6 +49,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; +import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueDeleter; @@ -371,11 +374,15 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return; } - server.setJMSQueueCreator(new JMSQueueCreator()); + server.setJMSQueueCreator(new JMSDestinationCreator()); server.setJMSQueueDeleter(new JMSQueueDeleter()); server.registerActivateCallback(this); + + server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback()); + + server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback()); /** * See this method's javadoc. *

@@ -523,10 +530,19 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return true; } + @Override public synchronized boolean createTopic(final boolean storeConfig, final String topicName, final String... bindings) throws Exception { + return createTopic(storeConfig, topicName, false, bindings); + } + + @Override + public synchronized boolean createTopic(final boolean storeConfig, + final String topicName, + final boolean autoCreated, + final String... bindings) throws Exception { if (active && topics.get(topicName) != null) { return false; } @@ -541,7 +557,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback public void runException() throws Exception { checkBindings(bindings); - if (internalCreateTopic(topicName)) { + if (internalCreateTopic(topicName, autoCreated)) { ActiveMQDestination destination = topics.get(topicName); if (destination == null) { @@ -1082,6 +1098,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + + /** * Performs the internal creation without activating any storage. * The storage load will call this method @@ -1091,6 +1109,10 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * @throws Exception */ private synchronized boolean internalCreateTopic(final String topicName) throws Exception { + return internalCreateTopic(topicName, false); + } + + private synchronized boolean internalCreateTopic(final String topicName, final boolean autoCreated) throws Exception { if (topics.get(topicName) != null) { return false; @@ -1101,7 +1123,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback // checks when routing messages to a topic that // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no // subscriptions - core has no notion of a topic - server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false); + server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated); topics.put(topicName, activeMQTopic); @@ -1619,13 +1641,19 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } - class JMSQueueCreator implements QueueCreator { + /** + * This class is responsible for auto-creating the JMS (and underlying core) resources when a client sends a message + * to a non-existent JMS queue or topic + */ + class JMSDestinationCreator implements QueueCreator { @Override public boolean create(SimpleString address) throws Exception { AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) { - JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true); - return true; + return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true); + } + else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) { + return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); } else { return false; @@ -1635,8 +1663,64 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback class JMSQueueDeleter implements QueueDeleter { @Override - public boolean delete(SimpleString address) throws Exception { - return JMSServerManagerImpl.this.destroyQueue(address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false); + public boolean delete(SimpleString queueName) throws Exception { + Queue queue = server.locateQueue(queueName); + SimpleString address = queue.getAddress(); + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + long consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); + + if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) { + if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues()); + } + + return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false); + } + else { + return false; + } + } + } + + /** + * When a core queue is created with a jms.topic prefix this class will create the associated JMS resources + * retroactively. This would happen if, for example, a client created a subscription a non-existent JMS topic and + * autoCreateJmsTopics = true. + */ + class JMSPostQueueCreationCallback implements PostQueueCreationCallback { + @Override + public void callback(SimpleString queueName) throws Exception { + Queue queue = server.locateQueue(queueName); + String address = queue.getAddress().toString(); + + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + /* When a topic is created a dummy subscription is created which never receives any messages; when the queue + * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the + * queue name doesn't start with the topic prefix. + */ + if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) { + createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true); + } + } + } + + /** + * When a core queue representing a JMS topic subscription is deleted this class will check to see if that was the + * last subscription on the topic and if so and autoDeleteJmsTopics = true then it will delete the JMS resources + * for that topic. + */ + class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback { + @Override + public void callback(SimpleString address, SimpleString queueName) throws Exception { + Queue queue = server.locateQueue(address); + Collection bindings = server.getPostOffice().getBindingsForAddress(address).getBindings(); + + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + + if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) { + destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length())); + } } } } diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index 15d3ba1587..d5e8df44ad 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -74,7 +74,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index b1f892ab58..fad17afc05 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -801,11 +801,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { - SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); - server.destroyQueue(qName); + server.destroyQueue(OpenWireUtil.toCoreAddress(dest)); } else { - Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); + Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest)); Iterator iterator = bindings.getBindings().iterator(); while (iterator.hasNext()) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index e2deb802be..70dfde3064 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -81,7 +81,12 @@ public class AMQConsumer { SimpleString address; if (openwireDestination.isTopic()) { - address = new SimpleString("jms.topic." + physicalName); + if (openwireDestination.isTemporary()) { + address = new SimpleString("jms.temptopic." + physicalName); + } + else { + address = new SimpleString("jms.topic." + physicalName); + } SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address); @@ -90,7 +95,7 @@ public class AMQConsumer { } else { SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination); - session.getCoreServer().getJMSQueueCreator().create(queueName); + session.getCoreServer().getJMSDestinationCreator().create(queueName); serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 218e0dfcc9..17d3e1876c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -149,7 +149,7 @@ public class AMQSession implements SessionCallback { for (ActiveMQDestination openWireDest : dests) { if (openWireDest.isQueue()) { SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest); - getCoreServer().getJMSQueueCreator().create(queueName); + getCoreServer().getJMSDestinationCreator().create(queueName); } AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 1c64676a13..3e7410cee0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -28,6 +28,11 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.util.ByteSequence; +import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX; +import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX; +import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX; +import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX; + public class OpenWireUtil { public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { @@ -39,10 +44,20 @@ public class OpenWireUtil { public static SimpleString toCoreAddress(ActiveMQDestination dest) { if (dest.isQueue()) { - return new SimpleString("jms.queue." + dest.getPhysicalName()); + if (dest.isTemporary()) { + return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); + } + else { + return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); + } } else { - return new SimpleString("jms.topic." + dest.getPhysicalName()); + if (dest.isTemporary()) { + return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); + } + else { + return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); + } } } @@ -54,7 +69,7 @@ public class OpenWireUtil { */ public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { String address = message.getAddress().toString(); - String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", ""); + String strippedAddress = address.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); if (actualDestination.isQueue()) { return new ActiveMQQueue(strippedAddress); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 568e5b1e3e..0812867afb 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -29,13 +29,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -227,30 +227,23 @@ public final class StompConnection implements RemotingConnection { } public void checkDestination(String destination) throws ActiveMQStompException { - if (autoCreateQueueIfPossible(destination)) { - return; - } + autoCreateDestinationIfPossible(destination); if (!manager.destinationExists(destination)) { throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler); } } - public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException { - boolean autoCreated = false; - - if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null) { - SimpleString queueName = new SimpleString(queue); - try { - manager.getServer().createQueue(queueName, queueName, null, SimpleString.toSimpleString(this.getLogin()), true, false, true); + public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException { + try { + QueueCreator queueCreator = manager.getServer().getJMSDestinationCreator(); + if (queueCreator != null) { + queueCreator.create(SimpleString.toSimpleString(queue)); } - catch (Exception e) { - throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); - } - autoCreated = true; } - - return autoCreated; + catch (Exception e) { + throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); + } } @Override @@ -618,7 +611,7 @@ public final class StompConnection implements RemotingConnection { String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException { - autoCreateQueueIfPossible(destination); + autoCreateDestinationIfPossible(destination); if (noLocal) { String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'"; if (selector == null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 0a6391b3cd..b058aed978 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -163,6 +163,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues"; + private static final String AUTO_CREATE_JMS_TOPICS = "auto-create-jms-topics"; + + private static final String AUTO_DELETE_JMS_TOPICS = "auto-delete-jms-topics"; + private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size"; private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections"; @@ -796,6 +800,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child)); } + else if (AUTO_CREATE_JMS_TOPICS.equalsIgnoreCase(name)) { + addressSettings.setAutoCreateJmsTopics(XMLUtil.parseBoolean(child)); + } + else if (AUTO_DELETE_JMS_TOPICS.equalsIgnoreCase(name)) { + addressSettings.setAutoDeleteJmsTopics(XMLUtil.parseBoolean(child)); + } else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) { addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child)); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index df84c48c0a..4c47e74dbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -1519,6 +1519,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active settings.put("slowConsumerPolicy", policy); settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()); settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()); + settings.put("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()); + settings.put("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsTopics()); JSONObject jsonObject = new JSONObject(settings); return jsonObject.toString(); @@ -1544,7 +1546,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active final long slowConsumerCheckPeriod, final String slowConsumerPolicy, final boolean autoCreateJmsQueues, - final boolean autoDeleteJmsQueues) throws Exception { + final boolean autoDeleteJmsQueues, + final boolean autoCreateJmsTopics, + final boolean autoDeleteJmsTopics) throws Exception { checkStarted(); // JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes @@ -1598,6 +1602,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues); addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues); + addressSettings.setAutoCreateJmsTopics(autoCreateJmsTopics); + addressSettings.setAutoDeleteJmsTopics(autoDeleteJmsTopics); server.getAddressSettingsRepository().addMatch(address, addressSettings); storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 62c7d99878..f12fd0baab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAdd import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -263,7 +264,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; BindingQueryResult result = session.executeBindingQuery(request.getAddress()); - if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) { + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { + response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics()); + } + else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) { response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues()); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index d5786e60a9..e416205686 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -126,6 +126,36 @@ public interface ActiveMQServer extends ActiveMQComponent { */ void callActivationFailureListeners(Exception e); + /** + * @param callback {@link org.apache.activemq.artemis.core.server.PostQueueCreationCallback} + */ + void registerPostQueueCreationCallback(PostQueueCreationCallback callback); + + /** + * @param callback {@link org.apache.activemq.artemis.core.server.PostQueueCreationCallback} + */ + void unregisterPostQueueCreationCallback(PostQueueCreationCallback callback); + + /** + * @param queueName + */ + void callPostQueueCreationCallbacks(SimpleString queueName) throws Exception; + + /** + * @param callback {@link org.apache.activemq.artemis.core.server.PostQueueDeletionCallback} + */ + void registerPostQueueDeletionCallback(PostQueueDeletionCallback callback); + + /** + * @param callback {@link org.apache.activemq.artemis.core.server.PostQueueDeletionCallback} + */ + void unregisterPostQueueDeletionCallback(PostQueueDeletionCallback callback); + + /** + * @param queueName + */ + void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception; + void checkQueueCreationLimit(String username) throws Exception; ServerSession createSession(String name, @@ -196,7 +226,7 @@ public interface ActiveMQServer extends ActiveMQComponent { /** * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator) */ - QueueCreator getJMSQueueCreator(); + QueueCreator getJMSDestinationCreator(); /** * This is the queue deleter responsible for automatic JMS Queue deletions. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java index ba1c83661c..fce3a2ed79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java @@ -28,14 +28,19 @@ public class BindingQueryResult { private boolean autoCreateJmsQueues; + private boolean autoCreateJmsTopics; + public BindingQueryResult(final boolean exists, final List queueNames, - final boolean autoCreateJmsQueues) { + final boolean autoCreateJmsQueues, + final boolean autoCreateJmsTopics) { this.exists = exists; this.queueNames = queueNames; this.autoCreateJmsQueues = autoCreateJmsQueues; + + this.autoCreateJmsTopics = autoCreateJmsTopics; } public boolean isExists() { @@ -46,6 +51,10 @@ public class BindingQueryResult { return autoCreateJmsQueues; } + public boolean isAutoCreateJmsTopics() { + return autoCreateJmsTopics; + } + public List getQueueNames() { return queueNames; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueCreationCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueCreationCallback.java new file mode 100644 index 0000000000..2303c7be7f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueCreationCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; + +/** + * When a "core" queue is created this callback will be invoked + */ +public interface PostQueueCreationCallback { + + void callback(SimpleString queueName) throws Exception; +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueDeletionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueDeletionCallback.java new file mode 100644 index 0000000000..dd4a95dc00 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PostQueueDeletionCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; + +/** + * When a "core" queue is deleted this callback will be invoked + */ +public interface PostQueueDeletionCallback { + + void callback(SimpleString address, SimpleString queueName) throws Exception; +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java index 4bdb8a41f0..d0628481e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java @@ -24,5 +24,5 @@ public interface QueueDeleter { /** * @return True if a queue was deleted. */ - boolean delete(SimpleString address) throws Exception; + boolean delete(SimpleString queueName) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 50437a1e5b..a5886d54ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -106,6 +106,8 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MemoryManager; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; +import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueDeleter; @@ -261,6 +263,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { private final Set activationFailureListeners = new ConcurrentHashSet<>(); + private final Set postQueueCreationCallbacks = new ConcurrentHashSet<>(); + + private final Set postQueueDeletionCallbacks = new ConcurrentHashSet<>(); + private volatile GroupingHandler groupingHandler; private NodeManager nodeManager; @@ -564,6 +570,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues(); + boolean autoCreateJmsTopics = address.toString().startsWith(ResourceNames.JMS_TOPIC) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsTopics(); List names = new ArrayList<>(); @@ -571,7 +578,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { ManagementService managementService = getManagementService(); if (managementService != null) { if (address.equals(managementService.getManagementAddress())) { - return new BindingQueryResult(true, names, autoCreateJmsQueues); + return new BindingQueryResult(true, names, autoCreateJmsQueues, autoCreateJmsTopics); } } @@ -583,7 +590,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues); + return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues, autoCreateJmsTopics); } @Override @@ -655,7 +662,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public QueueCreator getJMSQueueCreator() { + public QueueCreator getJMSDestinationCreator() { return jmsQueueCreator; } @@ -1496,6 +1503,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); } + SimpleString address = binding.getAddress(); + Queue queue = (Queue) binding.getBindable(); // This check is only valid if checkConsumerCount == true @@ -1507,14 +1516,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (queue.isDurable()) { // make sure the user has privileges to delete this queue - securityStore.check(binding.getAddress(), CheckType.DELETE_DURABLE_QUEUE, session); + securityStore.check(address, CheckType.DELETE_DURABLE_QUEUE, session); } else { - securityStore.check(binding.getAddress(), CheckType.DELETE_NON_DURABLE_QUEUE, session); + securityStore.check(address, CheckType.DELETE_NON_DURABLE_QUEUE, session); } } queue.deleteQueue(removeConsumers); + + callPostQueueDeletionCallbacks(address, queueName); } @Override @@ -1544,6 +1555,40 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + @Override + public void registerPostQueueCreationCallback(final PostQueueCreationCallback callback) { + postQueueCreationCallbacks.add(callback); + } + + @Override + public void unregisterPostQueueCreationCallback(final PostQueueCreationCallback callback) { + postQueueCreationCallbacks.remove(callback); + } + + @Override + public void callPostQueueCreationCallbacks(final SimpleString queueName) throws Exception { + for (PostQueueCreationCallback callback : postQueueCreationCallbacks) { + callback.callback(queueName); + } + } + + @Override + public void registerPostQueueDeletionCallback(final PostQueueDeletionCallback callback) { + postQueueDeletionCallbacks.add(callback); + } + + @Override + public void unregisterPostQueueDeletionCallback(final PostQueueDeletionCallback callback) { + postQueueDeletionCallbacks.remove(callback); + } + + @Override + public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception { + for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) { + callback.callback(address, queueName); + } + } + @Override public ExecutorFactory getExecutorFactory() { return executorFactory; @@ -2091,7 +2136,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName)); } else if (autoCreated) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queueName)); + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName)); } binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); @@ -2127,6 +2172,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { managementService.registerAddress(address); managementService.registerQueue(queue, address, storageManager); + callPostQueueCreationCallbacks(queueName); + return queue; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java index c39d26961b..10f9f56bfa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java @@ -17,41 +17,23 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager; -import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueDeleter; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; -import org.jboss.logging.Logger; public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { - private static final Logger logger = Logger.getLogger(AutoCreatedQueueManagerImpl.class); - private final SimpleString queueName; - private final ActiveMQServer server; + private final QueueDeleter deleter; private final Runnable runnable = new Runnable() { @Override public void run() { try { - Queue queue = server.locateQueue(queueName); - long consumerCount = queue.getConsumerCount(); - long messageCount = queue.getMessageCount(); - boolean isAutoDeleteJmsQueues = server.getAddressSettingsRepository().getMatch(queueName.toString()).isAutoDeleteJmsQueues(); - - if (server.locateQueue(queueName).getMessageCount() == 0 && isAutoDeleteJmsQueues) { - if (logger.isDebugEnabled()) { - logger.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues); - } - - if (server.getJMSQueueDeleter() != null) { - server.getJMSQueueDeleter().delete(queueName); - } - } - else if (logger.isDebugEnabled()) { - logger.debug("NOT deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues); + if (deleter != null) { + deleter.delete(queueName); } } catch (Exception e) { @@ -62,9 +44,8 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager { private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); - public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { - this.server = server; - + public AutoCreatedQueueManagerImpl(QueueDeleter deleter, SimpleString queueName) { + this.deleter = deleter; this.queueName = queueName; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 353db3a4ce..2ced2d8742 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -146,7 +146,7 @@ public class PostOfficeJournalLoader implements JournalLoader { Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated()); if (queueBindingInfo.isAutoCreated()) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer(), queueBindingInfo.getQueueName())); + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); } Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index a4c1438b9b..16cf238cca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -487,8 +487,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { Queue queue; - // any non-temporary JMS queue created via this method should be marked as auto-created - if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) { + // any non-temporary JMS destination created via this method should be marked as auto-created + if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC)) ) { queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true); } else { @@ -1453,7 +1453,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } private void installJMSHooks() { - this.queueCreator = server.getJMSQueueCreator(); + this.queueCreator = server.getJMSDestinationCreator(); } private Map> cloneTargetAddresses() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 4b53ec6fbc..642574b103 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -56,6 +56,10 @@ public class AddressSettings implements Mergeable, Serializable public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true; + public static final boolean DEFAULT_AUTO_CREATE_TOPICS = true; + + public static final boolean DEFAULT_AUTO_DELETE_TOPICS = true; + public static final long DEFAULT_REDISTRIBUTION_DELAY = -1; public static final long DEFAULT_EXPIRY_DELAY = -1; @@ -114,6 +118,10 @@ public class AddressSettings implements Mergeable, Serializable private Boolean autoDeleteJmsQueues = null; + private Boolean autoCreateJmsTopics = null; + + private Boolean autoDeleteJmsTopics = null; + private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; //from amq5 @@ -142,6 +150,8 @@ public class AddressSettings implements Mergeable, Serializable this.slowConsumerPolicy = other.slowConsumerPolicy; this.autoCreateJmsQueues = other.autoCreateJmsQueues; this.autoDeleteJmsQueues = other.autoDeleteJmsQueues; + this.autoCreateJmsTopics = other.autoCreateJmsTopics; + this.autoDeleteJmsTopics = other.autoDeleteJmsTopics; this.managementBrowsePageSize = other.managementBrowsePageSize; this.queuePrefetch = other.queuePrefetch; } @@ -167,6 +177,24 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public boolean isAutoCreateJmsTopics() { + return autoCreateJmsTopics != null ? autoCreateJmsTopics : AddressSettings.DEFAULT_AUTO_CREATE_TOPICS; + } + + public AddressSettings setAutoCreateJmsTopics(final boolean autoCreateJmsTopics) { + this.autoCreateJmsTopics = autoCreateJmsTopics; + return this; + } + + public boolean isAutoDeleteJmsTopics() { + return autoDeleteJmsTopics != null ? autoDeleteJmsTopics : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES; + } + + public AddressSettings setAutoDeleteJmsTopics(final boolean autoDeleteJmsTopics) { + this.autoDeleteJmsTopics = autoDeleteJmsTopics; + return this; + } + public boolean isLastValueQueue() { return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE; } @@ -416,6 +444,12 @@ public class AddressSettings implements Mergeable, Serializable if (autoDeleteJmsQueues == null) { autoDeleteJmsQueues = merged.autoDeleteJmsQueues; } + if (autoCreateJmsTopics == null) { + autoCreateJmsTopics = merged.autoCreateJmsTopics; + } + if (autoDeleteJmsTopics == null) { + autoDeleteJmsTopics = merged.autoDeleteJmsTopics; + } if (managementBrowsePageSize == null) { managementBrowsePageSize = merged.managementBrowsePageSize; } @@ -482,6 +516,10 @@ public class AddressSettings implements Mergeable, Serializable autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer); + autoCreateJmsTopics = BufferHelper.readNullableBoolean(buffer); + + autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer); + managementBrowsePageSize = BufferHelper.readNullableInteger(buffer); } @@ -509,6 +547,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) + BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) + + BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) + + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) + BufferHelper.sizeOfNullableInteger(managementBrowsePageSize); } @@ -556,6 +596,10 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues); + BufferHelper.writeNullableBoolean(buffer, autoCreateJmsTopics); + + BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics); + BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize); } @@ -587,6 +631,8 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode()); result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode()); result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode()); + result = prime * result + ((autoCreateJmsTopics == null) ? 0 : autoCreateJmsTopics.hashCode()); + result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode()); result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode()); return result; @@ -730,8 +776,20 @@ public class AddressSettings implements Mergeable, Serializable } else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues)) return false; - else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize)) + + if (autoCreateJmsTopics == null) { + if (other.autoCreateJmsTopics != null) + return false; + } + else if (!autoCreateJmsTopics.equals(other.autoCreateJmsTopics)) return false; + if (autoDeleteJmsTopics == null) { + if (other.autoDeleteJmsTopics != null) + return false; + } + else if (!autoDeleteJmsTopics.equals(other.autoDeleteJmsTopics)) + return false; + if (managementBrowsePageSize == null) { if (other.managementBrowsePageSize != null) return false; @@ -793,6 +851,10 @@ public class AddressSettings implements Mergeable, Serializable autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + + ", autoCreateJmsTopics=" + + autoCreateJmsTopics + + ", autoDeleteJmsTopics=" + + autoDeleteJmsTopics + ", managementBrowsePageSize=" + managementBrowsePageSize + "]"; diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 38961f0a75..93d2a9e38c 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2308,6 +2308,23 @@ + + + + whether or not to automatically create JMS topics when a producer sends or a consumer subscribes to + a topic + + + + + + + + whether or not to delete auto-created JMS topics when the last subscription is closed + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index c4831bc378..63962fba8d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -288,6 +288,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues()); + assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics()); + assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString()); @@ -301,6 +303,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues()); + assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics()); + assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 4b84e75322..58f7c99baa 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -41,6 +41,8 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues()); Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues()); + Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_TOPICS, addressSettings.isAutoCreateJmsTopics()); + Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_TOPICS, addressSettings.isAutoDeleteJmsTopics()); } @Test diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 0514788152..ec6d6e987d 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -248,6 +248,8 @@ NOTIFY true true + true + true a2.1 @@ -262,6 +264,8 @@ KILL false false + false + false diff --git a/docs/user-manual/en/queue-attributes.md b/docs/user-manual/en/queue-attributes.md index 9fa326b14a..22184aa989 100644 --- a/docs/user-manual/en/queue-attributes.md +++ b/docs/user-manual/en/queue-attributes.md @@ -90,6 +90,8 @@ entry that would be found in the `broker.xml` file. 5 true true + true + true @@ -177,7 +179,18 @@ create a JMS queue when a JMS message is sent to a queue whose name fits the address `match` (remember, a JMS queue is just a core queue which has the same address and queue name) or a JMS consumer tries to connect to a queue whose name fits the address `match`. Queues which are auto-created -are durable, non-temporary, and non-transient. +are durable, non-temporary, and non-transient. Default is `true`. -`auto-delete-jms-queues`. Whether or not to the broker should automatically +`auto-delete-jms-queues`. Whether or not the broker should automatically delete auto-created JMS queues when they have both 0 consumers and 0 messages. +Default is `true`. + +`auto-create-jms-topics`. Whether or not the broker should automatically +create a JMS topic when a JMS message is sent to a topic whose name fits +the address `match` (remember, a JMS topic is just a core address which has +one or more core queues mapped to it) or a JMS consumer tries to subscribe +to a topic whose name fits the address `match`. Default is `true`. + +`auto-delete-jms-topics`. Whether or not the broker should automatically +delete auto-created JMS topics once the last subscription on the topic has +been closed. Default is `true`. diff --git a/pom.xml b/pom.xml index d0327fa7b1..d8587e1876 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 1 0 0 - 126,125,124,123,122 + 127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) @@ -236,7 +236,7 @@ provided - + commons-collections commons-collections-testframework diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java similarity index 69% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsQueueTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index 54f18dc12b..fcc05a3243 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.tests.integration.client; import javax.jms.Connection; -import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -26,18 +25,21 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashSet; import java.util.Set; +import java.util.UUID; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class AutoCreateJmsQueueTest extends JMSTestBase { +public class AutoCreateJmsDestinationTest extends JMSTestBase { @Test public void testAutoCreateOnSendToQueue() throws Exception { @@ -134,15 +136,12 @@ public class AutoCreateJmsQueueTest extends JMSTestBase { javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); - try { - MessageProducer producer = session.createProducer(topic); - Assert.fail("Creating a producer here should throw an exception"); - } - catch (Exception e) { - Assert.assertTrue(e instanceof InvalidDestinationException); - } + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("msg")); connection.close(); + + assertNotNull(server.getManagementService().getResource("jms.topic.test")); } @Test @@ -166,22 +165,77 @@ public class AutoCreateJmsQueueTest extends JMSTestBase { } @Test - public void testAutoCreateOnConsumeFromTopic() throws Exception { - Connection connection = null; - connection = cf.createConnection(); + public void testAutoCreateOnSubscribeToTopic() throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final String topicName = "test-" + UUID.randomUUID().toString(); + + javax.jms.Topic topic = ActiveMQJMSClient.createTopic(topicName); + + MessageConsumer consumer = session.createConsumer(topic); + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("msg")); + connection.start(); + assertNotNull(consumer.receive(500)); + + assertNotNull(server.getManagementService().getResource("jms.topic." + topicName)); + + connection.close(); + + assertNull(server.getManagementService().getResource("jms.topic." + topicName)); + } + + @Test + public void testAutoCreateOnDurableSubscribeToTopic() throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("myClientID"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); - try { - MessageConsumer messageConsumer = session.createConsumer(topic); - Assert.fail("Creating a consumer here should throw an exception"); - } - catch (Exception e) { - Assert.assertTrue(e instanceof InvalidDestinationException); - } + MessageConsumer consumer = session.createDurableConsumer(topic, "myDurableSub"); + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("msg")); + connection.start(); + assertNotNull(consumer.receive(500)); connection.close(); + + assertNotNull(server.getManagementService().getResource("jms.topic.test")); + + assertNotNull(server.locateQueue(SimpleString.toSimpleString("myClientID.myDurableSub"))); + } + + @Test + public void testTemporaryTopic() throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + +// javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); + + ActiveMQTemporaryTopic topic = (ActiveMQTemporaryTopic) session.createTemporaryTopic(); + + MessageConsumer consumer = session.createConsumer(topic); + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("msg")); + connection.start(); + assertNotNull(consumer.receive(500)); + + SimpleString topicAddress = topic.getSimpleAddress(); + + consumer.close(); + + assertNotNull(server.locateQueue(topicAddress)); + + IntegrationTestLogger.LOGGER.info("Topic name: " + topicAddress); + + topic.delete(); + + connection.close(); + +// assertNotNull(server.getManagementService().getResource("jms.topic.test")); + + assertNull(server.locateQueue(topicAddress)); } @Before diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java similarity index 65% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsQueueTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java index 81cd6ec3e4..44bb7643da 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java @@ -31,10 +31,10 @@ import org.apache.activemq.artemis.core.server.Queue; import org.junit.Assert; import org.junit.Test; -public class AutoDeleteJmsQueueTest extends JMSTestBase { +public class AutoDeleteJmsDestinationTest extends JMSTestBase { @Test - public void testAutoDelete() throws Exception { + public void testAutoDeleteQueue() throws Exception { Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -124,4 +124,76 @@ public class AutoDeleteJmsQueueTest extends JMSTestBase { // ensure the queue was not removed Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test"))); } + + @Test + public void testAutoDeleteTopic() throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); + + MessageConsumer messageConsumer = session.createConsumer(topic); + MessageProducer producer = session.createProducer(topic); + + final int numMessages = 100; + + for (int i = 0; i < numMessages; i++) { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + + producer.close(); + connection.start(); + + for (int i = 0; i < numMessages; i++) { + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + } + + connection.close(); + + // ensure the topic was removed + Assert.assertNull(server.locateQueue(new SimpleString("jms.topic.test"))); + + // make sure the JMX control was removed for the JMS topic + assertNull(server.getManagementService().getResource("jms.topic.test")); + } + + @Test + public void testAutoDeleteTopicDurableSubscriber() throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("myClientID"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test"); + + MessageConsumer messageConsumer = session.createDurableConsumer(topic, "mySub"); + MessageProducer producer = session.createProducer(topic); + + final int numMessages = 100; + + for (int i = 0; i < numMessages; i++) { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + + producer.close(); + connection.start(); + + for (int i = 0; i < numMessages; i++) { + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + } + + messageConsumer.close(); + session.unsubscribe("mySub"); + + connection.close(); + + // ensure the topic was removed + Assert.assertNull(server.locateQueue(new SimpleString("jms.topic.test"))); + + // make sure the JMX control was removed for the JMS topic + assertNull(server.getManagementService().getResource("jms.topic.test")); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java index 8c2d9cdb64..8f8780309b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java @@ -607,7 +607,7 @@ public class PagingOrderTest extends ActiveMQTestBase { jmsServer.createTopic(true, "tt", "/topic/TT"); - server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true); + server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true); ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); @@ -663,7 +663,7 @@ public class PagingOrderTest extends ActiveMQTestBase { jmsServer.setRegistry(new JndiBindingRegistry(context)); jmsServer.start(); - server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true); + server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true); jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java index 913ef9a1d1..1115305b50 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java @@ -32,6 +32,7 @@ import java.util.Random; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.junit.Assert; @@ -54,6 +55,7 @@ public class NonExistentQueueTest extends JMSTestBase { @Test public void sendToNonExistentDestination() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false)); Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist"); TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName()); ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 5595ddf657..aa4d685d5e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -468,12 +468,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase { String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString(); boolean autoCreateJmsQueues = false; boolean autoDeleteJmsQueues = false; + boolean autoCreateJmsTopics = false; + boolean autoDeleteJmsTopics = false; - serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues); + serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics); boolean ex = false; try { - serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, 100, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues); + serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, 100, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics); } catch (Exception expected) { ex = true; @@ -504,8 +506,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues()); assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues()); + assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics()); + assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics()); - serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -1, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues); + serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -1, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics); jsonString = serverControl.getAddressSettingsAsJSON(exactAddress); info = AddressSettingsInfo.from(jsonString); @@ -528,10 +532,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues()); assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues()); + assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics()); + assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics()); ex = false; try { - serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -2, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues); + serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -2, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics); } catch (Exception e) { ex = true; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 7519fef9e4..0eb77d4b95 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -587,8 +587,10 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy, @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues, - @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception { - proxy.invokeOperation("addAddressSettings", addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues); + @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues, + @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics, + @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception { + proxy.invokeOperation("addAddressSettings", addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java index d8b099528c..97ef1716b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java @@ -30,6 +30,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; @@ -52,6 +54,8 @@ public class ManagementWithStompTest extends ManagementTestBase { protected ActiveMQServer server; + protected JMSServerManager jmsServer; + protected ClientSession session; private Socket stompSocket; @@ -169,7 +173,9 @@ public class ManagementWithStompTest extends ManagementTestBase { server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false, "brianm", "wombats")); - server.start(); + jmsServer = new JMSServerManagerImpl(server); + + jmsServer.start(); locator = createInVMNonHALocator().setBlockOnNonDurableSend(true); ClientSessionFactory sf = createSessionFactory(locator); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsTopicRequestReplyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsTopicRequestReplyTest.java index 5228d03155..ad2e85b731 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsTopicRequestReplyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsTopicRequestReplyTest.java @@ -57,11 +57,16 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa clientConnection = createConnection(); clientConnection.setClientID("ClientConnection:" + name.getMethodName()); + System.out.println("Creating session."); Session session = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); clientConnection.start(); Destination replyDestination = createTemporaryDestination(session); + System.out.println("Created temporary topic " + replyDestination); + + System.out.println("Creating consumer on: " + replyDestination); + MessageConsumer replyConsumer = session.createConsumer(replyDestination); // lets test the destination clientSideClientID = clientConnection.getClientID(); @@ -74,12 +79,15 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa System.out.println("Both the clientID and destination clientID match properly: " + clientSideClientID); /* build queues */ - MessageProducer requestProducer = session.createProducer(requestDestination); - MessageConsumer replyConsumer = session.createConsumer(replyDestination); /* build requestmessage */ TextMessage requestMessage = session.createTextMessage("Olivier"); requestMessage.setJMSReplyTo(replyDestination); + + System.out.println("Creating producer on " + requestDestination); + MessageProducer requestProducer = session.createProducer(requestDestination); + + System.out.println("Sending message to " + requestDestination); requestProducer.send(requestMessage); System.out.println("Sent request."); @@ -116,7 +124,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa try { TextMessage requestMessage = (TextMessage) message; - System.out.println("Received request."); + System.out.println("Received request from " + requestDestination); System.out.println(requestMessage.toString()); Destination replyDestination = requestMessage.getJMSReplyTo(); @@ -140,7 +148,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa replyProducer.send(replyDestination, replyMessage); } - System.out.println("Sent reply."); + System.out.println("Sent reply to " + replyDestination); System.out.println(replyMessage.toString()); } catch (JMSException e) { @@ -180,6 +188,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa requestDestination = createDestination(serverSession); /* build queues */ + System.out.println("Creating consumer on: " + requestDestination); final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination); if (useAsyncConsume) { requestConsumer.setMessageListener(this); @@ -232,6 +241,7 @@ public class JmsTopicRequestReplyTest extends BasicOpenWireTest implements Messa ((TemporaryTopic) dest).delete(); } else { + System.out.println("Deleting: " + dest); ((TemporaryQueue) dest).delete(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 81b5511cec..41b5c35e70 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -233,6 +233,43 @@ public class StompTest extends StompTestBase { assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistentQueue))); } + @Test + public void testSendMessageToNonExistentTopic() throws Exception { + String nonExistentTopic = RandomUtil.randomString(); + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + // first send a message to ensure that sending to a non-existent topic won't throw an error + frame = "SEND\n" + "destination:" + getTopicPrefix() + nonExistentTopic + "\n\n" + "Hello World" + Stomp.NULL; + sendFrame(frame); + receiveFrame(1000); + + // create a subscription on the topic and send/receive another message + MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createTopic(nonExistentTopic)); + sendFrame(frame); + receiveFrame(1000); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + // Assert default priority 4 is used when priority header is not set + Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority()); + + // Make sure that the timestamp is valid - should + // be very close to the current time. + long tnow = System.currentTimeMillis(); + long tmsg = message.getJMSTimestamp(); + Assert.assertTrue(Math.abs(tnow - tmsg) < 1500); + + assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_TOPIC + nonExistentTopic))); + + // closing the consumer here should trigger auto-deletion of the topic + consumer.close(); + assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_TOPIC + nonExistentTopic))); + } + /* * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame * This means next frame read might have a \n a the beginning. diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java index cb7e493d15..19bb908999 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage; import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; @@ -348,6 +349,7 @@ public class MessageProducerTest extends JMSTestCase { @Test public void testCreateProducerOnInexistentDestination() throws Exception { + getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false)); Connection pconn = createConnection(); try { Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SessionTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SessionTest.java index e6a8909a17..5f2ad279bb 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SessionTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SessionTest.java @@ -172,6 +172,7 @@ public class SessionTest extends ActiveMQServerTestCase { @Test public void testCreateNonExistentTopic() throws Exception { + getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false)); Connection conn = getConnectionFactory().createConnection(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); try { @@ -201,6 +202,7 @@ public class SessionTest extends ActiveMQServerTestCase { @Test public void testCreateTopicWhileQueueWithSameNameExists() throws Exception { + getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false)); Connection conn = getConnectionFactory().createConnection(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); try {