From 74fa4ca7580f1fce3f2727dab5edce41cc68af92 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 17 Mar 2023 16:44:05 -0500 Subject: [PATCH] ARTEMIS-4212 fix sending msgs to address w/mismatching routing types When sending, for example, to a predefined anycast address and queue from a multicast (JMS topic) producer, the routed count on the address is incremented, but the message count on the matching queue is not. No indication is given at the client end that the messages failed to get routed - the messages are just silently dropped. Fixing this problem requires a slight semantic change. The broker is now more strict in what it allows specifically with regards to auto-creation. If, for example, a JMS application attempts to send a message to a topic and the corresponding multicast address doesn't exist already or the broker cannot automatically create it or update it then sending the message will fail. Also, part of this commit moves a chunk of auto-create logic into ServerSession and adds an enum for auto-create results. Aside from helping fix this specific issue this can serve as a foundation for de-duplicating the auto-create logic spread across many of the protocol implementations. --- .../artemis/api/core/AutoCreateResult.java | 21 +++ .../artemis/api/core/QueueConfiguration.java | 60 +++++++- .../api/core/QueueConfigurationTest.java | 32 ++++ .../api/core/client/ClientSession.java | 4 + .../core/client/impl/AddressQueryImpl.java | 20 ++- .../core/impl/ActiveMQSessionContext.java | 15 +- .../core/protocol/core/impl/ChannelImpl.java | 2 + .../protocol/core/impl/PacketDecoder.java | 6 + .../core/protocol/core/impl/PacketImpl.java | 5 + ...SessionBindingQueryResponseMessage_V4.java | 20 ++- ...SessionBindingQueryResponseMessage_V5.java | 140 ++++++++++++++++++ .../resources/activemq-version.properties | 2 +- .../artemis/jms/client/ActiveMQSession.java | 4 + ...tiveMQResourceCustomConfigurationTest.java | 3 + .../amqp/broker/AMQPSessionCallback.java | 7 +- .../ActiveMQAMQPProtocolMessageBundle.java | 4 +- .../proton/ProtonServerReceiverContext.java | 36 +++-- .../client/HornetQClientSessionContext.java | 2 +- .../protocol/openwire/OpenWireConnection.java | 81 +++------- .../protocol/openwire/amq/AMQSession.java | 65 ++++---- .../core/postoffice/impl/PostOfficeImpl.java | 11 +- .../core/ServerSessionPacketHandler.java | 5 +- .../artemis/core/server/ServerSession.java | 3 +- .../core/server/impl/ServerSessionImpl.java | 123 +++++++++------ pom.xml | 2 +- .../amqp/AmqpClientTestSupport.java | 4 +- ...eiverReconnectWithMulticastPrefixTest.java | 8 +- .../amqp/AmqpMessageRoutingTest.java | 4 +- .../integration/amqp/AmqpReceiverTest.java | 2 +- .../integration/amqp/AmqpSendReceiveTest.java | 3 +- .../integration/amqp/AmqpTestSupport.java | 3 + ...AutoCreateWithDefaultRoutingTypesTest.java | 12 +- .../BrokerDefinedMulticastConsumerTest.java | 11 ++ .../amqp/QueueAutoCreationTest.java | 2 +- .../client/AutoCreateJmsDestinationTest.java | 2 +- .../JMSMismatchedRoutingTypeTest.java | 138 +++++++++++++++++ .../integration/security/SecurityTest.java | 15 +- .../server/RetroactiveAddressTest.java | 2 +- .../reload-divert-address-source1.xml | 12 +- .../reload-divert-address-source2.xml | 12 +- .../reload-divert-address-target1.xml | 12 +- .../reload-divert-address-target2.xml | 12 +- .../resources/reload-divert-exclusive.xml | 8 +- .../resources/reload-divert-filter-none.xml | 8 +- .../resources/reload-divert-filter-x-eq-x.xml | 8 +- .../resources/reload-divert-filter-x-eq-y.xml | 8 +- .../resources/reload-divert-non-exclusive.xml | 8 +- 47 files changed, 735 insertions(+), 232 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java new file mode 100644 index 0000000000..a988259fc0 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +public enum AutoCreateResult { + EXISTED, CREATED, UPDATED, NOT_FOUND; +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java index 7bb1d7804c..42b4f45b2d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java @@ -76,6 +76,7 @@ public class QueueConfiguration implements Serializable { public static final String INTERNAL = "internal"; public static final String TRANSIENT = "transient"; public static final String AUTO_CREATED = "auto-created"; + public static final String FQQN = "fqqn"; private Long id; // internal use private SimpleString name; @@ -108,10 +109,46 @@ public class QueueConfiguration implements Serializable { private Boolean internal; private Boolean _transient; private Boolean autoCreated; + private Boolean fqqn; public QueueConfiguration() { } + public QueueConfiguration(QueueConfiguration o) { + id = o.id; + name = o.name; + address = o.address; + routingType = o.routingType; + filterString = o.filterString; + durable = o.durable; + user = o.user; + maxConsumers = o.maxConsumers; + exclusive = o.exclusive; + groupRebalance = o.groupRebalance; + groupRebalancePauseDispatch = o.groupRebalancePauseDispatch; + groupBuckets = o.groupBuckets; + groupFirstKey = o.groupFirstKey; + lastValue = o.lastValue; + lastValueKey = o.lastValueKey; + nonDestructive = o.nonDestructive; + purgeOnNoConsumers = o.purgeOnNoConsumers; + enabled = o.enabled; + consumersBeforeDispatch = o.consumersBeforeDispatch; + delayBeforeDispatch = o.delayBeforeDispatch; + consumerPriority = o.consumerPriority; + autoDelete = o.autoDelete; + autoDeleteDelay = o.autoDeleteDelay; + autoDeleteMessageCount = o.autoDeleteMessageCount; + ringSize = o.ringSize; + configurationManaged = o.configurationManaged; + temporary = o.temporary; + autoCreateAddress = o.autoCreateAddress; + internal = o.internal; + _transient = o._transient; + autoCreated = o.autoCreated; + fqqn = o.fqqn; + } + /** * Instantiate this object and invoke {@link #setName(SimpleString)} * @@ -261,7 +298,7 @@ public class QueueConfiguration implements Serializable { } /** - * Set the name. If the fully-qualified queue name is used then it will be parsed and the corresponding values for + * Set the address. If the fully-qualified queue name is used then it will be parsed and the corresponding values for * {@code address} and {@code name} will be set automatically. For example if "myAddress::myQueue" is passed then the * resulting value for {@code address} will be "myAddress" and the value for {@code name} will be "myQueue". * @@ -272,6 +309,7 @@ public class QueueConfiguration implements Serializable { if (CompositeAddress.isFullyQualified(address)) { this.name = CompositeAddress.extractQueueName(address); this.address = CompositeAddress.extractAddressName(address); + this.fqqn = Boolean.TRUE; } else { this.address = address; } @@ -301,6 +339,7 @@ public class QueueConfiguration implements Serializable { if (CompositeAddress.isFullyQualified(name)) { this.name = CompositeAddress.extractQueueName(name); this.address = CompositeAddress.extractAddressName(name); + this.fqqn = Boolean.TRUE; } else { this.name = name; } @@ -607,6 +646,16 @@ public class QueueConfiguration implements Serializable { return this; } + /** + * Based on if the name or address uses FQQN when set + * + * defaults to {@code false} + * @return + */ + public Boolean isFqqn() { + return fqqn == null ? Boolean.FALSE : fqqn; + } + /** * This method returns a JSON-formatted {@code String} representation of this {@code QueueConfiguration}. It is a * simple collection of key/value pairs. The keys used are referenced in {@link #set(String, String)}. @@ -709,6 +758,9 @@ public class QueueConfiguration implements Serializable { if (isAutoCreated() != null) { builder.add(AUTO_CREATED, isAutoCreated()); } + if (isFqqn() != null) { + builder.add(FQQN, isFqqn()); + } return builder.build().toString(); } @@ -807,6 +859,8 @@ public class QueueConfiguration implements Serializable { return false; if (!Objects.equals(autoCreated, that.autoCreated)) return false; + if (!Objects.equals(fqqn, that.fqqn)) + return false; return true; } @@ -844,6 +898,7 @@ public class QueueConfiguration implements Serializable { result = 31 * result + Objects.hashCode(internal); result = 31 * result + Objects.hashCode(_transient); result = 31 * result + Objects.hashCode(autoCreated); + result = 31 * result + Objects.hashCode(fqqn); return result; } @@ -880,6 +935,7 @@ public class QueueConfiguration implements Serializable { + ", autoCreateAddress=" + autoCreateAddress + ", internal=" + internal + ", transient=" + _transient - + ", autoCreated=" + autoCreated + ']'; + + ", autoCreated=" + autoCreated + + ", fqqn=" + fqqn + ']'; } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java index 35dbe36c6b..7119565444 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Test; @@ -39,4 +41,34 @@ public class QueueConfigurationTest { queueConfiguration.set(QueueConfiguration.GROUP_REBALANCE_PAUSE_DISPATCH, Boolean.toString(false)); Assert.assertEquals(false, queueConfiguration.isGroupRebalancePauseDispatch()); } + + @Test + public void testFqqn() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(CompositeAddress.toFullyQualified(ADDRESS, QUEUE)); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertTrue(queueConfiguration.isFqqn()); + } + + @Test + public void testFqqnNegative() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(QUEUE).setAddress(ADDRESS); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertFalse(queueConfiguration.isFqqn()); + } + + @Test + public void testFqqnViaAddress() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(RandomUtil.randomSimpleString()).setAddress(CompositeAddress.toFullyQualified(ADDRESS, QUEUE)); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertTrue(queueConfiguration.isFqqn()); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 1e748ebae9..9b2a2a5a39 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -94,6 +94,10 @@ public interface ClientSession extends XAResource, AutoCloseable { Integer getDefaultConsumersBeforeDispatch(); Long getDefaultDelayBeforeDispatch(); + + boolean isSupportsMulticast(); + + boolean isSupportsAnycast(); } /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java index 8c543de812..cb9438ee54 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java @@ -48,6 +48,10 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { private final Long defaultDelayBeforeDispatch; + private final boolean supportsMulticast; + + private final boolean supportsAnycast; + public AddressQueryImpl(final boolean exists, final List queueNames, final boolean autoCreateQueues, @@ -59,7 +63,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { final SimpleString defaultLastValueKey, final Boolean defaultNonDestructive, final Integer defaultConsumersBeforeDispatch, - final Long defaultDelayBeforeDispatch) { + final Long defaultDelayBeforeDispatch, + final boolean supportsMulticast, + final boolean supportsAnycast) { this.exists = exists; this.queueNames = new ArrayList<>(queueNames); this.autoCreateQueues = autoCreateQueues; @@ -72,6 +78,8 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { this.defaultNonDestructive = defaultNonDestructive; this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + this.supportsMulticast = supportsMulticast; + this.supportsAnycast = supportsAnycast; } @Override @@ -133,4 +141,14 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { public Long getDefaultDelayBeforeDispatch() { return defaultDelayBeforeDispatch; } + + @Override + public boolean isSupportsMulticast() { + return supportsMulticast; + } + + @Override + public boolean isSupportsAnycast() { + return supportsAnycast; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index b864dc1013..38233a7f27 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -87,6 +87,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; @@ -429,22 +430,26 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) { + if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5, getServerVersion())) { + Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V5); + SessionBindingQueryResponseMessage_V5 response = (SessionBindingQueryResponseMessage_V5) packet; + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), response.isSupportsMulticast(), response.isSupportsAnycast()); + } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V4); SessionBindingQueryResponseMessage_V4 response = (SessionBindingQueryResponseMessage_V4) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), true, true); } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3); SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } else { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index d7fa20df9b..c91075b8e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -193,6 +193,8 @@ public final class ChannelImpl implements Channel { case PacketImpl.CREATESESSION_V2: case PacketImpl.DISCONNECT_V3: return version >= PacketImpl.ARTEMIS_2_18_0_VERSION; + case PacketImpl.SESS_BINDINGQUERY_RESP_V5: + return version >= PacketImpl.ARTEMIS_2_29_0_VERSION; default: return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 6b71951cd3..e2d2f22d46 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; @@ -132,6 +133,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V4; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V5; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; @@ -325,6 +327,10 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionBindingQueryResponseMessage_V4(); break; } + case SESS_BINDINGQUERY_RESP_V5: { + packet = new SessionBindingQueryResponseMessage_V5(); + break; + } case SESS_XA_START: { packet = new SessionXAStartMessage(); break; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 00020faecc..9d5b6e8a0d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -50,6 +50,9 @@ public class PacketImpl implements Packet { // 2.28.0 public static final int ARTEMIS_2_28_0_VERSION = 134; + // 2.29.0 + public static final int ARTEMIS_2_29_0_VERSION = 135; + public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue."); public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic."); @@ -300,6 +303,8 @@ public class PacketImpl implements Packet { public static final byte REMOVE_PRODUCER = -21; + public static final byte SESS_BINDINGQUERY_RESP_V5 = -22; + public PacketImpl(final byte type) { this.type = type; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java index ec819b79f4..8c6a4a2262 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java @@ -24,21 +24,21 @@ import org.apache.activemq.artemis.utils.BufferHelper; public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryResponseMessage_V3 { - private boolean defaultPurgeOnNoConsumers; + protected boolean defaultPurgeOnNoConsumers; - private int defaultMaxConsumers; + protected int defaultMaxConsumers; - private Boolean defaultExclusive; + protected Boolean defaultExclusive; - private Boolean defaultLastValue; + protected Boolean defaultLastValue; - private SimpleString defaultLastValueKey; + protected SimpleString defaultLastValueKey; - private Boolean defaultNonDestructive; + protected Boolean defaultNonDestructive; - private Integer defaultConsumersBeforeDispatch; + protected Integer defaultConsumersBeforeDispatch; - private Long defaultDelayBeforeDispatch; + protected Long defaultDelayBeforeDispatch; public SessionBindingQueryResponseMessage_V4(final boolean exists, final List queueNames, @@ -83,6 +83,10 @@ public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryRe super(SESS_BINDINGQUERY_RESP_V4); } + public SessionBindingQueryResponseMessage_V4(byte v) { + super(v); + } + public boolean isDefaultPurgeOnNoConsumers() { return defaultPurgeOnNoConsumers; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java new file mode 100644 index 0000000000..41af7930be --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; + +public class SessionBindingQueryResponseMessage_V5 extends SessionBindingQueryResponseMessage_V4 { + + protected boolean supportsMulticast; + + protected boolean supportsAnycast; + + public SessionBindingQueryResponseMessage_V5(final boolean exists, + final List queueNames, + final boolean autoCreateQueues, + final boolean autoCreateAddresses, + final boolean defaultPurgeOnNoConsumers, + final int defaultMaxConsumers, + final Boolean defaultExclusive, + final Boolean defaultLastValue, + final SimpleString defaultLastValueKey, + final Boolean defaultNonDestructive, + final Integer defaultConsumersBeforeDispatch, + final Long defaultDelayBeforeDispatch, + final boolean supportsMulticast, + final boolean supportsAnycast) { + super(SESS_BINDINGQUERY_RESP_V5); + + this.exists = exists; + + this.queueNames = queueNames; + + this.autoCreateQueues = autoCreateQueues; + + this.autoCreateAddresses = autoCreateAddresses; + + this.defaultPurgeOnNoConsumers = defaultPurgeOnNoConsumers; + + this.defaultMaxConsumers = defaultMaxConsumers; + + this.defaultExclusive = defaultExclusive; + + this.defaultLastValue = defaultLastValue; + + this.defaultLastValueKey = defaultLastValueKey; + + this.defaultNonDestructive = defaultNonDestructive; + + this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; + + this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + + this.supportsMulticast = supportsMulticast; + + this.supportsAnycast = supportsAnycast; + } + + public SessionBindingQueryResponseMessage_V5() { + super(SESS_BINDINGQUERY_RESP_V5); + } + + public Boolean isSupportsMulticast() { + return supportsMulticast; + } + + public Boolean isSupportsAnycast() { + return supportsAnycast; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(supportsMulticast); + buffer.writeBoolean(supportsAnycast); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() > 0) { + supportsMulticast = buffer.readBoolean(); + supportsAnycast = buffer.readBoolean(); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (supportsMulticast ? 1231 : 1237); + result = prime * result + (supportsAnycast ? 1231 : 1237); + return result; + } + + @Override + protected String getPacketString() { + StringBuffer buff = new StringBuffer(super.getPacketString()); + buff.append(", supportsMulticast=" + supportsMulticast); + buff.append(", supportsAnycast=" + supportsAnycast); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof SessionBindingQueryResponseMessage_V5)) { + return false; + } + SessionBindingQueryResponseMessage_V5 other = (SessionBindingQueryResponseMessage_V5) obj; + if (supportsMulticast != other.supportsMulticast) { + return false; + } + if (supportsAnycast != other.supportsAnycast) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index 7295b14231..8debcd3c3e 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134,135 diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 74c2ccddce..2a726fa1fb 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -404,6 +404,10 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses()); } } + } else { + if ((!addressQuery.isSupportsMulticast() && !destination.isQueue()) || (!addressQuery.isSupportsAnycast() && destination.isQueue())) { + throw new InvalidDestinationException("Destination " + address + " exists, but does not support " + (destination.isQueue() ? RoutingType.ANYCAST.name() : RoutingType.MULTICAST.name()) + " routing"); + } } // Second we create the queue, the address would have existed or successfully created. diff --git a/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java b/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java index 0ee10be34e..56999545d2 100644 --- a/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java +++ b/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java @@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest { @Rule public RuleChain rulechain = RuleChain.outerRule(server); + public EmbeddedActiveMQResourceCustomConfigurationTest() throws Exception { + } + @After public void tear() { server.stop(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c710f1bc32..f30c09b3dd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -329,7 +330,8 @@ public class AMQPSessionCallback implements SessionCallback { public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception { - return serverSession.checkAutoCreate(address, routingType); + AutoCreateResult autoCreateResult = serverSession.checkAutoCreate(new QueueConfiguration(address).setRoutingType(routingType)); + return autoCreateResult != AutoCreateResult.NOT_FOUND; } public AddressQueryResult addressQuery(SimpleString addressName, @@ -469,7 +471,7 @@ public class AMQPSessionCallback implements SessionCallback { //here check queue-autocreation if (!checkAddressAndAutocreateIfPossible(address, routingType)) { - ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString()); if (transaction != null) { transaction.markAsRollbackOnly(e); } @@ -767,7 +769,6 @@ public class AMQPSessionCallback implements SessionCallback { return this.transactionHandler; } - class AddressQueryCache { SimpleString address; T result; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java index 91c352eafa..de5571370c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -39,8 +39,8 @@ public interface ActiveMQAMQPProtocolMessageBundle { @Message(id = 119001, value = "error creating temporary queue, {}") ActiveMQAMQPInternalErrorException errorCreatingTemporaryQueue(String message); - @Message(id = 119002, value = "target address does not exist") - ActiveMQAMQPNotFoundException addressDoesntExist(); + @Message(id = 119002, value = "target address {} does not exist") + ActiveMQAMQPNotFoundException addressDoesntExist(String address); @Message(id = 119003, value = "error finding temporary queue, {}") ActiveMQAMQPNotFoundException errorFindingTemporaryQueue(String message); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 33e87bd5ed..935450cc18 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -119,7 +119,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { defRoutingType = getRoutingType(target.getCapabilities(), address); try { if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, defRoutingType)) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString()); } } catch (ActiveMQAMQPNotFoundException e) { throw e; @@ -157,6 +157,30 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { } private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) { + RoutingType explicitRoutingType = getExplicitRoutingType(symbols); + if (explicitRoutingType != null) { + return explicitRoutingType; + } else { + final AddressInfo addressInfo = sessionSPI.getAddress(address); + /* + * If we're dealing with an *existing* address that has just one routing-type simply use that. + * This allows "bare" AMQP clients (which have no built-in routing semantics) to send messages + * wherever they want in this case because the routing ambiguity is eliminated. + */ + if (addressInfo != null && addressInfo.getRoutingTypes().size() == 1) { + return addressInfo.getRoutingType(); + } else { + return getDefaultRoutingType(address); + } + } + } + + private RoutingType getDefaultRoutingType(SimpleString address) { + RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address); + return defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType; + } + + private RoutingType getExplicitRoutingType(Symbol[] symbols) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -166,15 +190,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { } } } - final AddressInfo addressInfo = sessionSPI.getAddress(address); - if (addressInfo != null && !addressInfo.getRoutingTypes().isEmpty()) { - if (addressInfo.getRoutingTypes().size() == 1 && addressInfo.getRoutingType() == RoutingType.MULTICAST) { - return RoutingType.MULTICAST; - } - } - RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address); - defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType; - return defaultRoutingType; + return null; } @Override diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index 9b0b77f2cb..a0389be865 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -71,7 +71,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), true, true); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 4631373e05..4e1934b659 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -16,7 +16,15 @@ */ package org.apache.activemq.artemis.core.protocol.openwire; +import javax.jms.IllegalStateException; +import javax.jms.InvalidClientIDException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSSecurityException; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -29,23 +37,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import javax.jms.IllegalStateException; -import javax.jms.InvalidClientIDException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSSecurityException; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -64,19 +63,14 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.SecurityAuth; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.AddressQueryResult; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.TempQueueObserver; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RefsOperation; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; @@ -136,7 +130,6 @@ import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * Represents an activemq connection. @@ -906,29 +899,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName()); - if (server.locateQueue(qName) == null) { - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName()); - if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) { - try { - internalSession.createQueue(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST).setDurable(!dest.isTemporary()).setTemporary(dest.isTemporary()).setAutoCreated(!dest.isTemporary())); - created = true; - } catch (ActiveMQQueueExistsException exists) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. - } - } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) { - try { - AddressInfo addressInfo = new AddressInfo(qName, RoutingType.MULTICAST); - if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) { - addressInfo.setInternal(true); - } - if (internalSession.getAddress(addressInfo.getName()) == null) { - internalSession.createAddress(addressInfo, !dest.isTemporary()); - created = true; - } - } catch (ActiveMQAddressExistsException exists) { - // The address may have been created by another thread in the mean time. Catch and do nothing. - } + + AutoCreateResult autoCreateResult = internalSession.checkAutoCreate(new QueueConfiguration(qName) + .setRoutingType(dest.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST) + .setDurable(!dest.isTemporary()) + .setTemporary(dest.isTemporary())); + if (autoCreateResult == AutoCreateResult.CREATED) { + created = true; + if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) { + internalSession.getAddress(qName).setInternal(true); } + } else if (autoCreateResult == AutoCreateResult.NOT_FOUND) { + throw new InvalidDestinationException(dest.getDestinationTypeAsString() + " " + dest.getPhysicalName() + " does not exist."); } if (dest.isTemporary()) { @@ -1172,23 +1154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - /** - * Checks to see if this destination exists. If it does not throw an invalid destination exception. - * - * @param destination - */ - private void validateDestination(ActiveMQDestination destination) throws Exception { - if (destination.isQueue()) { - SimpleString physicalName = new SimpleString(destination.getPhysicalName()); - QueueQueryResult queue = server.queueQuery(physicalName); - AddressQueryResult address = server.addressQuery(physicalName); - - if (!address.isExists() && !queue.isAutoCreateQueues()) { - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); - } - } - } - private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) { for (ConsumerState consumerState : sessionState.getConsumerStates()) { consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId); @@ -1270,11 +1235,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se ActiveMQDestination destination = info.getDestination(); if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (destination.isQueue()) { - OpenWireConnection.this.validateDestination(destination); - } - DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); - OpenWireConnection.this.addDestination(destInfo); + OpenWireConnection.this.addDestination(new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination)); } ss.addProducer(info); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 966db041e1..9acfd5dd7a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,25 +16,23 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; - +import javax.jms.InvalidDestinationException; +import javax.jms.ResourceAllocationException; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.InvalidDestinationException; -import javax.jms.ResourceAllocationException; - import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; @@ -42,7 +40,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -66,7 +63,8 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; + +import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; public class AMQSession implements SessionCallback { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -230,39 +228,26 @@ public class AMQSession implements SessionCallback { boolean hasQueue = true; if (!connection.containsKnownDestination(queueName)) { - QueueQueryResult queueQuery = server.queueQuery(queueName); - - try { - if (!queueQuery.isExists()) { - if (queueQuery.isAutoCreateQueues()) { - SimpleString queueNameToUse = queueName; - SimpleString addressToUse = queueName; - RoutingType routingTypeToUse = RoutingType.ANYCAST; - if (CompositeAddress.isFullyQualified(queueName.toString())) { - addressToUse = CompositeAddress.extractAddressName(queueName); - queueNameToUse = CompositeAddress.extractQueueName(queueName); - AddressInfo addressInfo = server.getAddressInfo(addressToUse); - if (addressInfo != null) { - routingTypeToUse = addressInfo.getRoutingType(); - } else { - AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString()); - routingTypeToUse = as.getDefaultAddressRoutingType(); - } - } - coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter)); - connection.addKnownDestination(queueName); - } else { - if (server.getAddressInfo(queueName) == null) { - //Address does not exist and will not get autocreated - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); - } - hasQueue = false; - } + RoutingType routingTypeToUse = RoutingType.ANYCAST; + if (CompositeAddress.isFullyQualified(queueName.toString())) { + SimpleString addressToUse = CompositeAddress.extractAddressName(queueName); + AddressInfo addressInfo = server.getAddressInfo(addressToUse); + if (addressInfo != null) { + routingTypeToUse = addressInfo.getRoutingType(); + } else { + AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString()); + routingTypeToUse = as.getDefaultAddressRoutingType(); } - } catch (ActiveMQQueueExistsException e) { - // In case another thread created the queue before us but after we did the binding query - hasQueue = true; } + AutoCreateResult autoCreateResult = coreSession.checkAutoCreate(new QueueConfiguration(queueName) + .setAddress(queueName) + .setRoutingType(routingTypeToUse) + .setTemporary(isTemporary) + .setFilterString(filter)); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); + } + connection.addKnownDestination(queueName); } return hasQueue; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6bf72f9948..2b397434e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -1252,14 +1253,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception { AddressInfo addressInfo = addressManager.getAddressInfo(address); if (addressInfo == null && context.getServerSession() != null) { - if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { - addressInfo = addressManager.getAddressInfo(address); - } else { + AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType())); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); if (context.getTransaction() != null) { context.getTransaction().markAsRollbackOnly(ex); } throw ex; + } else { + addressInfo = addressManager.getAddressInfo(address); } } return addressInfo; @@ -1268,7 +1270,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(address); if (bindings == null && context.getServerSession() != null) { - if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { + AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType())); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); Transaction tx = context.getTransaction(); if (tx != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 03ac2a8d5a..c570359e74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -470,7 +471,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } - if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5)) { + response = new SessionBindingQueryResponseMessage_V5(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch(), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.MULTICAST), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.ANYCAST)); + } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index e1f88157b3..d02851ee8f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.json.JsonArrayBuilder; import javax.transaction.xa.Xid; import java.util.Collection; @@ -111,7 +112,7 @@ public interface ServerSession extends SecurityAuth { void addCloseable(Closeable closeable); - boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception; + AutoCreateResult checkAutoCreate(QueueConfiguration queueConfiguration) throws Exception; ServerConsumer createConsumer(long consumerID, SimpleString queueName, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 99bcee9746..77ca360486 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,15 +16,10 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.core.management.impl.view.ProducerField; -import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.json.JsonArrayBuilder; -import org.apache.activemq.artemis.json.JsonObjectBuilder; -import java.security.cert.X509Certificate; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; +import java.lang.invoke.MethodHandles; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,10 +34,13 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -54,6 +52,7 @@ import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.management.impl.view.ProducerField; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -61,6 +60,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -94,6 +94,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction.State; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.json.JsonArrayBuilder; +import org.apache.activemq.artemis.json.JsonObjectBuilder; import org.apache.activemq.artemis.json.JsonValue; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -105,7 +107,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * Server side Session implementation @@ -1746,49 +1747,83 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return tx; } - @Override - public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception { - boolean result; - SimpleString unPrefixedAddress = removePrefix(address); + public AutoCreateResult checkAutoCreate(final QueueConfiguration queueConfig) throws Exception { + AutoCreateResult result; + SimpleString unPrefixedAddress = removePrefix(queueConfig.getAddress()); + SimpleString unPrefixedQueue = removePrefix(queueConfig.getName()); AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString()); - if (routingType == RoutingType.MULTICAST) { - if (server.getAddressInfo(unPrefixedAddress) == null) { - if (addressSettings.isAutoCreateAddresses()) { - try { - createAddress(address, routingType, true); - } catch (ActiveMQAddressExistsException e) { - // The address may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } else { - result = false; + /* + * This is only here to maintain backwards compatibility with the previous implementation. + * + * TODO: figure out how to get rid of this + */ + if (queueConfig.getRoutingType() == null) { + return AutoCreateResult.EXISTED; + } + + // No matter what routing-type is used the address must exist already or be automatically created. + AddressInfo addressInfo = server.getAddressInfo(unPrefixedAddress); + if (addressInfo == null) { + // The address doesn't exist. + if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) { + // Try to create the address if possible. + try { + createAddress(queueConfig.getAddress(), queueConfig.getRoutingType(), true).setTemporary(queueConfig.isTemporary()); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean time. Catch and do nothing. } + result = AutoCreateResult.CREATED; } else { - result = true; - } - } else if (routingType == RoutingType.ANYCAST) { - if (server.locateQueue(unPrefixedAddress) == null) { - Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address); - if (bindings != null) { - // this means the address has another queue with a different name, which is fine, we just ignore it on this case - result = true; - } else if (addressSettings.isAutoCreateQueues()) { - try { - createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); - } catch (ActiveMQQueueExistsException e) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } else { - result = false; - } - } else { - result = true; + // If the address doesn't exist and can't be autocreated then return that result immediately. + return AutoCreateResult.NOT_FOUND; } } else { - result = true; + // The address exists. + if (addressInfo.getRoutingTypes().contains(queueConfig.getRoutingType())) { + // The existing address supports the requested routing-type. + result = AutoCreateResult.EXISTED; + } else { + // The existing address doesn't support the requested routing-type. + if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) { + // Try to update the address with the new routing-type if possible. + try { + createAddress(addressInfo.addRoutingType(queueConfig.getRoutingType()), true); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean-time. Catch and do nothing. + } + result = AutoCreateResult.UPDATED; + } else { + // If the address exists but doesn't support the requested routing-type and can't be updated with the new routing-type then return that result immediately. + return AutoCreateResult.NOT_FOUND; + } + } + } + + if (queueConfig.getRoutingType() == RoutingType.ANYCAST || queueConfig.isFqqn()) { + if (server.locateQueue(unPrefixedQueue) == null) { + // The queue doesn't exist. + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress); + if (bindings != null && !queueConfig.isFqqn()) { + // The address has another queue with a different name, which is fine. Just ignore it. + result = AutoCreateResult.EXISTED; + } else if (addressSettings.isAutoCreateQueues() || queueConfig.isTemporary()) { + // Try to create the queue. + try { + createQueue(new QueueConfiguration(queueConfig).setAutoCreated(true)); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean-time. Catch and do nothing. + } + result = AutoCreateResult.CREATED; + } else { + // The queue doesn't exist, and we can't auto-create it so return that result immediately. + return AutoCreateResult.NOT_FOUND; + } + } else { + // The queue exists. + result = AutoCreateResult.EXISTED; + } } return result; diff --git a/pom.xml b/pom.xml index ccc9151f27..5f5415e3ec 100644 --- a/pom.xml +++ b/pom.xml @@ -182,7 +182,7 @@ 1 0 0 - 134,133,132,131,130,129,128,127,126,125,124,123,122 + 135,134,133,132,131,130,129,128,127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 0de24670fc..7eb4c5221a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -166,8 +166,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport { Set acceptors = server.getConfiguration().getAcceptorConfigurations(); for (TransportConfiguration tc : acceptors) { if (tc.getName().equals(NETTY_ACCEPTOR)) { - tc.getExtraParams().put("anycastPrefix", "anycast://"); - tc.getExtraParams().put("multicastPrefix", "multicast://"); + tc.getExtraParams().put("anycastPrefix", ANYCAST_PREFIX); + tc.getExtraParams().put("multicastPrefix", MULTICAST_PREFIX); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java index 2f0355850c..ce1007af1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java @@ -72,8 +72,8 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("anycastPrefix", "anycast://"); - params.put("multicastPrefix", "multicast://"); + params.put("anycastPrefix", ANYCAST_PREFIX); + params.put("multicastPrefix", MULTICAST_PREFIX); } @Override @@ -96,7 +96,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Test(timeout = 60000) public void testReattachToDurableNodeAndTryAndReceiveNewlySentMessage() throws Exception { final String addressName = "test-address"; - final String prefixedName = "multicast://" + addressName; + final String prefixedName = MULTICAST_PREFIX + addressName; AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.createConnection()); @@ -143,7 +143,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Test(timeout = 60000) public void testReattachToDurableNodeAndTryAndReceivePreviouslySentMessage() throws Exception { final String addressName = "test-address"; - final String prefixedName = "multicast://" + addressName; + final String prefixedName = MULTICAST_PREFIX + addressName; AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.createConnection()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java index 89f3ebce79..a052dddd09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java @@ -62,7 +62,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).toJSON()); serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); - sendMessages("anycast://" + addressA, 1); + sendMessages(ANYCAST_PREFIX + addressA, 1); Wait.assertEquals(1, () -> (server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount())); Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC))::getMessageCount); @@ -100,7 +100,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); - sendMessages("multicast://" + addressA, 1); + sendMessages(MULTICAST_PREFIX + addressA, 1); Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA))::getMessageCount); Wait.assertEquals(2, () -> (server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java index fe9e9e0d4e..111eeac9ad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java @@ -302,7 +302,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); - assertTrue(expectedException.getMessage().contains("target address does not exist")); + assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist")); } finally { connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index e4e2648657..bac1a46bbb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -185,6 +185,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableTrue() throws Exception { + assertNotNull(server.locateQueue(getQueueName())); sendMessages(getQueueName(), 1, true); AmqpClient client = createAmqpClient(); @@ -1115,7 +1116,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); - assertTrue(expectedException.getMessage().contains("target address does not exist")); + assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist")); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index d7605620e8..c06a2ab0ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -44,6 +44,9 @@ public class AmqpTestSupport extends ActiveMQTestBase { protected static final String BROKER_NAME = "localhost"; protected static final String NETTY_ACCEPTOR = "netty-acceptor"; + protected static final String MULTICAST_PREFIX = "multicast://"; + protected static final String ANYCAST_PREFIX = "anycast://"; + protected String noprivUser = "noprivs"; protected String noprivPass = "noprivs"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java index bc66518022..d8394cd381 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java @@ -74,8 +74,8 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("anycastPrefix", "anycast://"); - params.put("multicastPrefix", "multicast://"); + params.put("anycastPrefix", ANYCAST_PREFIX); + params.put("multicastPrefix", MULTICAST_PREFIX); } @Override @@ -272,9 +272,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport final String prefixedName; if (routingType == RoutingType.ANYCAST) { - prefixedName = "anycast://" + addressName; + prefixedName = ANYCAST_PREFIX + addressName; } else { - prefixedName = "multicast://" + addressName; + prefixedName = MULTICAST_PREFIX + addressName; } AmqpSender sender = session.createSender(prefixedName); @@ -320,9 +320,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport final String prefixedName; if (routingType == RoutingType.ANYCAST) { - prefixedName = "anycast://" + addressName; + prefixedName = ANYCAST_PREFIX + addressName; } else { - prefixedName = "multicast://" + addressName; + prefixedName = MULTICAST_PREFIX + addressName; } final AmqpReceiver receiver = session.createReceiver(prefixedName); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java index 53004477f6..6317673c27 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -40,6 +40,16 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { SimpleString queue1 = new SimpleString("queue1"); SimpleString queue2 = new SimpleString("queue2"); + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + @Test(timeout = 60000) public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); @@ -64,6 +74,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testConsumeWhenOnlyAnycast() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(address).setAddress(address).setRoutingType(RoutingType.ANYCAST)); sendMessages(address.toString(), 1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java index 8b4bbcf4d8..802df9c095 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java @@ -124,7 +124,7 @@ public class QueueAutoCreationTest extends JMSClientTestSupport { Connection connection = factory.createConnection(); SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID(); logger.debug("Address is {}", addressName); - clientSession.createAddress(addressName, RoutingType.ANYCAST, false); + clientSession.createAddress(addressName, RoutingType.MULTICAST, false); Topic topic = new ActiveMQTopic(addressName.toString()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(topic); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index de7fcb19cd..50c092e960 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -389,7 +389,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { Connection connection = factory.createConnection(); SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID(); logger.debug("Address is {}", addressName); - clientSession.createAddress(addressName, RoutingType.ANYCAST, false); + clientSession.createAddress(addressName, RoutingType.MULTICAST, false); Topic topic = new ActiveMQTopic(addressName.toString()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(topic); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java new file mode 100644 index 0000000000..f962e973b6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Test; + +public class JMSMismatchedRoutingTypeTest extends MultiprotocolJMSClientTestSupport { + + protected final String ANYCAST_ADDRESS = RandomUtil.randomString(); + protected final String MULTICAST_ADDRESS = RandomUtil.randomString(); + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(ANYCAST_ADDRESS), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(ANYCAST_ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(MULTICAST_ADDRESS), RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(MULTICAST_ADDRESS).setRoutingType(RoutingType.MULTICAST)); + } + + @Test + public void testSendingMulticastToAnycastAMQP() throws Exception { + internalTestSendingMulticastToAnycast(AMQPConnection); + } + + @Test + public void testSendingMulticastToAnycastCore() throws Exception { + internalTestSendingMulticastToAnycast(CoreConnection); + } + + @Test + public void testSendingMulticastToAnycastOpenWire() throws Exception { + internalTestSendingMulticastToAnycast(OpenWireConnection); + } + + private void internalTestSendingMulticastToAnycast(ConnectionSupplier connectionSupplier) throws Exception { + Connection connection = null; + try { + connection = connectionSupplier.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = s.createTopic(ANYCAST_ADDRESS); + MessageProducer producer = s.createProducer(topic); + producer.send(s.createMessage()); + fail("Sending message here should fail!"); + } catch (InvalidDestinationException e) { + // expected + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void testSendingAnycastToMulticastAMQP() throws Exception { + internalTestSendingAnycastToMulticast(AMQPConnection); + } + + @Test + public void testSendingAnycastToMulticastCore() throws Exception { + internalTestSendingAnycastToMulticast(CoreConnection); + } + + @Test + public void testSendingAnycastToMulticastOpenWire() throws Exception { + internalTestSendingAnycastToMulticast(OpenWireConnection); + } + + private void internalTestSendingAnycastToMulticast(ConnectionSupplier connectionSupplier) throws Exception { + Connection connection = null; + try { + connection = connectionSupplier.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q; + try { + // when using core this will fail because the client actually checks for the queue on the broker (which won't be there) + q = s.createQueue(MULTICAST_ADDRESS); + } catch (JMSException e) { + if (connection instanceof ActiveMQConnection) { + // expected + return; + } else { + throw e; + } + } + try { + MessageProducer producer = s.createProducer(q); + producer.send(s.createMessage()); + fail("Sending message here should fail!"); + } catch (InvalidDestinationException e) { + // expected + } + } finally { + if (connection != null) { + connection.close(); + } + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java index 30a9287ea9..2507971d7e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java @@ -378,6 +378,12 @@ public class SecurityTest extends ActiveMQTestBase { try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //Add the corresponding addresses to the server + SimpleString address = SimpleString.toSimpleString("test.queue"); + server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + SimpleString address2 = SimpleString.toSimpleString("test.topic"); + server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST)); + //Test queue creation permission try { session.createConsumer(session.createQueue("test.queue")); @@ -394,14 +400,9 @@ public class SecurityTest extends ActiveMQTestBase { assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'")); } - //Add a test queue and topic to the server - SimpleString address = SimpleString.toSimpleString("test.queue"); - server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + //Add a test queue to the server server.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); - SimpleString address2 = SimpleString.toSimpleString("test.topic"); - server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST)); - //Test queue produce permission try { MessageProducer producer = session.createProducer(session.createQueue("test.queue")); @@ -441,7 +442,7 @@ public class SecurityTest extends ActiveMQTestBase { session.createTemporaryQueue(); Assert.fail("should throw exception here"); } catch (Exception e) { - assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'")); + assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_ADDRESS'")); } //Test temp topic diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java index a493104a34..d667b3fd30 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java @@ -299,7 +299,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase { final int COUNT = 10; final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); - server.addAddressInfo(new AddressInfo(addressName)); + server.addAddressInfo(new AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST)); ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0"); Connection c = cf.createConnection(); diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml index a055523c80..b76280f152 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml index fbb08005a0..2d8ae6dd75 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml index 220fee04eb..e474294e55 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml index 1b3bf2c858..5bcbe0f057 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml index c8a0054be2..6b7167871e 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml index c8a0054be2..6b7167871e 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml index e748824490..b2d3a7b504 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml index 78519664f2..4659258160 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml index df5c5a9836..b26b6d4f4f 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +