diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java new file mode 100644 index 0000000000..a988259fc0 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/AutoCreateResult.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +public enum AutoCreateResult { + EXISTED, CREATED, UPDATED, NOT_FOUND; +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java index 7bb1d7804c..42b4f45b2d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java @@ -76,6 +76,7 @@ public class QueueConfiguration implements Serializable { public static final String INTERNAL = "internal"; public static final String TRANSIENT = "transient"; public static final String AUTO_CREATED = "auto-created"; + public static final String FQQN = "fqqn"; private Long id; // internal use private SimpleString name; @@ -108,10 +109,46 @@ public class QueueConfiguration implements Serializable { private Boolean internal; private Boolean _transient; private Boolean autoCreated; + private Boolean fqqn; public QueueConfiguration() { } + public QueueConfiguration(QueueConfiguration o) { + id = o.id; + name = o.name; + address = o.address; + routingType = o.routingType; + filterString = o.filterString; + durable = o.durable; + user = o.user; + maxConsumers = o.maxConsumers; + exclusive = o.exclusive; + groupRebalance = o.groupRebalance; + groupRebalancePauseDispatch = o.groupRebalancePauseDispatch; + groupBuckets = o.groupBuckets; + groupFirstKey = o.groupFirstKey; + lastValue = o.lastValue; + lastValueKey = o.lastValueKey; + nonDestructive = o.nonDestructive; + purgeOnNoConsumers = o.purgeOnNoConsumers; + enabled = o.enabled; + consumersBeforeDispatch = o.consumersBeforeDispatch; + delayBeforeDispatch = o.delayBeforeDispatch; + consumerPriority = o.consumerPriority; + autoDelete = o.autoDelete; + autoDeleteDelay = o.autoDeleteDelay; + autoDeleteMessageCount = o.autoDeleteMessageCount; + ringSize = o.ringSize; + configurationManaged = o.configurationManaged; + temporary = o.temporary; + autoCreateAddress = o.autoCreateAddress; + internal = o.internal; + _transient = o._transient; + autoCreated = o.autoCreated; + fqqn = o.fqqn; + } + /** * Instantiate this object and invoke {@link #setName(SimpleString)} * @@ -261,7 +298,7 @@ public class QueueConfiguration implements Serializable { } /** - * Set the name. If the fully-qualified queue name is used then it will be parsed and the corresponding values for + * Set the address. If the fully-qualified queue name is used then it will be parsed and the corresponding values for * {@code address} and {@code name} will be set automatically. For example if "myAddress::myQueue" is passed then the * resulting value for {@code address} will be "myAddress" and the value for {@code name} will be "myQueue". * @@ -272,6 +309,7 @@ public class QueueConfiguration implements Serializable { if (CompositeAddress.isFullyQualified(address)) { this.name = CompositeAddress.extractQueueName(address); this.address = CompositeAddress.extractAddressName(address); + this.fqqn = Boolean.TRUE; } else { this.address = address; } @@ -301,6 +339,7 @@ public class QueueConfiguration implements Serializable { if (CompositeAddress.isFullyQualified(name)) { this.name = CompositeAddress.extractQueueName(name); this.address = CompositeAddress.extractAddressName(name); + this.fqqn = Boolean.TRUE; } else { this.name = name; } @@ -607,6 +646,16 @@ public class QueueConfiguration implements Serializable { return this; } + /** + * Based on if the name or address uses FQQN when set + * + * defaults to {@code false} + * @return + */ + public Boolean isFqqn() { + return fqqn == null ? Boolean.FALSE : fqqn; + } + /** * This method returns a JSON-formatted {@code String} representation of this {@code QueueConfiguration}. It is a * simple collection of key/value pairs. The keys used are referenced in {@link #set(String, String)}. @@ -709,6 +758,9 @@ public class QueueConfiguration implements Serializable { if (isAutoCreated() != null) { builder.add(AUTO_CREATED, isAutoCreated()); } + if (isFqqn() != null) { + builder.add(FQQN, isFqqn()); + } return builder.build().toString(); } @@ -807,6 +859,8 @@ public class QueueConfiguration implements Serializable { return false; if (!Objects.equals(autoCreated, that.autoCreated)) return false; + if (!Objects.equals(fqqn, that.fqqn)) + return false; return true; } @@ -844,6 +898,7 @@ public class QueueConfiguration implements Serializable { result = 31 * result + Objects.hashCode(internal); result = 31 * result + Objects.hashCode(_transient); result = 31 * result + Objects.hashCode(autoCreated); + result = 31 * result + Objects.hashCode(fqqn); return result; } @@ -880,6 +935,7 @@ public class QueueConfiguration implements Serializable { + ", autoCreateAddress=" + autoCreateAddress + ", internal=" + internal + ", transient=" + _transient - + ", autoCreated=" + autoCreated + ']'; + + ", autoCreated=" + autoCreated + + ", fqqn=" + fqqn + ']'; } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java index 35dbe36c6b..7119565444 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/QueueConfigurationTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Test; @@ -39,4 +41,34 @@ public class QueueConfigurationTest { queueConfiguration.set(QueueConfiguration.GROUP_REBALANCE_PAUSE_DISPATCH, Boolean.toString(false)); Assert.assertEquals(false, queueConfiguration.isGroupRebalancePauseDispatch()); } + + @Test + public void testFqqn() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(CompositeAddress.toFullyQualified(ADDRESS, QUEUE)); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertTrue(queueConfiguration.isFqqn()); + } + + @Test + public void testFqqnNegative() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(QUEUE).setAddress(ADDRESS); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertFalse(queueConfiguration.isFqqn()); + } + + @Test + public void testFqqnViaAddress() { + final SimpleString ADDRESS = RandomUtil.randomSimpleString(); + final SimpleString QUEUE = RandomUtil.randomSimpleString(); + QueueConfiguration queueConfiguration = new QueueConfiguration(RandomUtil.randomSimpleString()).setAddress(CompositeAddress.toFullyQualified(ADDRESS, QUEUE)); + Assert.assertEquals(ADDRESS, queueConfiguration.getAddress()); + Assert.assertEquals(QUEUE, queueConfiguration.getName()); + Assert.assertTrue(queueConfiguration.isFqqn()); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index 1e748ebae9..9b2a2a5a39 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -94,6 +94,10 @@ public interface ClientSession extends XAResource, AutoCloseable { Integer getDefaultConsumersBeforeDispatch(); Long getDefaultDelayBeforeDispatch(); + + boolean isSupportsMulticast(); + + boolean isSupportsAnycast(); } /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java index 8c543de812..cb9438ee54 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java @@ -48,6 +48,10 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { private final Long defaultDelayBeforeDispatch; + private final boolean supportsMulticast; + + private final boolean supportsAnycast; + public AddressQueryImpl(final boolean exists, final List queueNames, final boolean autoCreateQueues, @@ -59,7 +63,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { final SimpleString defaultLastValueKey, final Boolean defaultNonDestructive, final Integer defaultConsumersBeforeDispatch, - final Long defaultDelayBeforeDispatch) { + final Long defaultDelayBeforeDispatch, + final boolean supportsMulticast, + final boolean supportsAnycast) { this.exists = exists; this.queueNames = new ArrayList<>(queueNames); this.autoCreateQueues = autoCreateQueues; @@ -72,6 +78,8 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { this.defaultNonDestructive = defaultNonDestructive; this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + this.supportsMulticast = supportsMulticast; + this.supportsAnycast = supportsAnycast; } @Override @@ -133,4 +141,14 @@ public class AddressQueryImpl implements ClientSession.AddressQuery { public Long getDefaultDelayBeforeDispatch() { return defaultDelayBeforeDispatch; } + + @Override + public boolean isSupportsMulticast() { + return supportsMulticast; + } + + @Override + public boolean isSupportsAnycast() { + return supportsAnycast; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index b864dc1013..38233a7f27 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -87,6 +87,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; @@ -429,22 +430,26 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) { + if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5, getServerVersion())) { + Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V5); + SessionBindingQueryResponseMessage_V5 response = (SessionBindingQueryResponseMessage_V5) packet; + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), response.isSupportsMulticast(), response.isSupportsAnycast()); + } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V4); SessionBindingQueryResponseMessage_V4 response = (SessionBindingQueryResponseMessage_V4) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), true, true); } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3); SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } else { Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet; - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index d7fa20df9b..c91075b8e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -193,6 +193,8 @@ public final class ChannelImpl implements Channel { case PacketImpl.CREATESESSION_V2: case PacketImpl.DISCONNECT_V3: return version >= PacketImpl.ARTEMIS_2_18_0_VERSION; + case PacketImpl.SESS_BINDINGQUERY_RESP_V5: + return version >= PacketImpl.ARTEMIS_2_29_0_VERSION; default: return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 6b71951cd3..e2d2f22d46 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; @@ -132,6 +133,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V4; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V5; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; @@ -325,6 +327,10 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionBindingQueryResponseMessage_V4(); break; } + case SESS_BINDINGQUERY_RESP_V5: { + packet = new SessionBindingQueryResponseMessage_V5(); + break; + } case SESS_XA_START: { packet = new SessionXAStartMessage(); break; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 00020faecc..9d5b6e8a0d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -50,6 +50,9 @@ public class PacketImpl implements Packet { // 2.28.0 public static final int ARTEMIS_2_28_0_VERSION = 134; + // 2.29.0 + public static final int ARTEMIS_2_29_0_VERSION = 135; + public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue."); public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic."); @@ -300,6 +303,8 @@ public class PacketImpl implements Packet { public static final byte REMOVE_PRODUCER = -21; + public static final byte SESS_BINDINGQUERY_RESP_V5 = -22; + public PacketImpl(final byte type) { this.type = type; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java index ec819b79f4..8c6a4a2262 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java @@ -24,21 +24,21 @@ import org.apache.activemq.artemis.utils.BufferHelper; public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryResponseMessage_V3 { - private boolean defaultPurgeOnNoConsumers; + protected boolean defaultPurgeOnNoConsumers; - private int defaultMaxConsumers; + protected int defaultMaxConsumers; - private Boolean defaultExclusive; + protected Boolean defaultExclusive; - private Boolean defaultLastValue; + protected Boolean defaultLastValue; - private SimpleString defaultLastValueKey; + protected SimpleString defaultLastValueKey; - private Boolean defaultNonDestructive; + protected Boolean defaultNonDestructive; - private Integer defaultConsumersBeforeDispatch; + protected Integer defaultConsumersBeforeDispatch; - private Long defaultDelayBeforeDispatch; + protected Long defaultDelayBeforeDispatch; public SessionBindingQueryResponseMessage_V4(final boolean exists, final List queueNames, @@ -83,6 +83,10 @@ public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryRe super(SESS_BINDINGQUERY_RESP_V4); } + public SessionBindingQueryResponseMessage_V4(byte v) { + super(v); + } + public boolean isDefaultPurgeOnNoConsumers() { return defaultPurgeOnNoConsumers; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java new file mode 100644 index 0000000000..41af7930be --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V5.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; + +public class SessionBindingQueryResponseMessage_V5 extends SessionBindingQueryResponseMessage_V4 { + + protected boolean supportsMulticast; + + protected boolean supportsAnycast; + + public SessionBindingQueryResponseMessage_V5(final boolean exists, + final List queueNames, + final boolean autoCreateQueues, + final boolean autoCreateAddresses, + final boolean defaultPurgeOnNoConsumers, + final int defaultMaxConsumers, + final Boolean defaultExclusive, + final Boolean defaultLastValue, + final SimpleString defaultLastValueKey, + final Boolean defaultNonDestructive, + final Integer defaultConsumersBeforeDispatch, + final Long defaultDelayBeforeDispatch, + final boolean supportsMulticast, + final boolean supportsAnycast) { + super(SESS_BINDINGQUERY_RESP_V5); + + this.exists = exists; + + this.queueNames = queueNames; + + this.autoCreateQueues = autoCreateQueues; + + this.autoCreateAddresses = autoCreateAddresses; + + this.defaultPurgeOnNoConsumers = defaultPurgeOnNoConsumers; + + this.defaultMaxConsumers = defaultMaxConsumers; + + this.defaultExclusive = defaultExclusive; + + this.defaultLastValue = defaultLastValue; + + this.defaultLastValueKey = defaultLastValueKey; + + this.defaultNonDestructive = defaultNonDestructive; + + this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; + + this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + + this.supportsMulticast = supportsMulticast; + + this.supportsAnycast = supportsAnycast; + } + + public SessionBindingQueryResponseMessage_V5() { + super(SESS_BINDINGQUERY_RESP_V5); + } + + public Boolean isSupportsMulticast() { + return supportsMulticast; + } + + public Boolean isSupportsAnycast() { + return supportsAnycast; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(supportsMulticast); + buffer.writeBoolean(supportsAnycast); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() > 0) { + supportsMulticast = buffer.readBoolean(); + supportsAnycast = buffer.readBoolean(); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (supportsMulticast ? 1231 : 1237); + result = prime * result + (supportsAnycast ? 1231 : 1237); + return result; + } + + @Override + protected String getPacketString() { + StringBuffer buff = new StringBuffer(super.getPacketString()); + buff.append(", supportsMulticast=" + supportsMulticast); + buff.append(", supportsAnycast=" + supportsAnycast); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof SessionBindingQueryResponseMessage_V5)) { + return false; + } + SessionBindingQueryResponseMessage_V5 other = (SessionBindingQueryResponseMessage_V5) obj; + if (supportsMulticast != other.supportsMulticast) { + return false; + } + if (supportsAnycast != other.supportsAnycast) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index 7295b14231..8debcd3c3e 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134,135 diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 74c2ccddce..2a726fa1fb 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -404,6 +404,10 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses()); } } + } else { + if ((!addressQuery.isSupportsMulticast() && !destination.isQueue()) || (!addressQuery.isSupportsAnycast() && destination.isQueue())) { + throw new InvalidDestinationException("Destination " + address + " exists, but does not support " + (destination.isQueue() ? RoutingType.ANYCAST.name() : RoutingType.MULTICAST.name()) + " routing"); + } } // Second we create the queue, the address would have existed or successfully created. diff --git a/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java b/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java index 0ee10be34e..56999545d2 100644 --- a/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java +++ b/artemis-junit/artemis-junit-4/src/test/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResourceCustomConfigurationTest.java @@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest { @Rule public RuleChain rulechain = RuleChain.outerRule(server); + public EmbeddedActiveMQResourceCustomConfigurationTest() throws Exception { + } + @After public void tear() { server.stop(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c710f1bc32..f30c09b3dd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -329,7 +330,8 @@ public class AMQPSessionCallback implements SessionCallback { public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception { - return serverSession.checkAutoCreate(address, routingType); + AutoCreateResult autoCreateResult = serverSession.checkAutoCreate(new QueueConfiguration(address).setRoutingType(routingType)); + return autoCreateResult != AutoCreateResult.NOT_FOUND; } public AddressQueryResult addressQuery(SimpleString addressName, @@ -469,7 +471,7 @@ public class AMQPSessionCallback implements SessionCallback { //here check queue-autocreation if (!checkAddressAndAutocreateIfPossible(address, routingType)) { - ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString()); if (transaction != null) { transaction.markAsRollbackOnly(e); } @@ -767,7 +769,6 @@ public class AMQPSessionCallback implements SessionCallback { return this.transactionHandler; } - class AddressQueryCache { SimpleString address; T result; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java index 91c352eafa..de5571370c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -39,8 +39,8 @@ public interface ActiveMQAMQPProtocolMessageBundle { @Message(id = 119001, value = "error creating temporary queue, {}") ActiveMQAMQPInternalErrorException errorCreatingTemporaryQueue(String message); - @Message(id = 119002, value = "target address does not exist") - ActiveMQAMQPNotFoundException addressDoesntExist(); + @Message(id = 119002, value = "target address {} does not exist") + ActiveMQAMQPNotFoundException addressDoesntExist(String address); @Message(id = 119003, value = "error finding temporary queue, {}") ActiveMQAMQPNotFoundException errorFindingTemporaryQueue(String message); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 33e87bd5ed..935450cc18 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -119,7 +119,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { defRoutingType = getRoutingType(target.getCapabilities(), address); try { if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, defRoutingType)) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString()); } } catch (ActiveMQAMQPNotFoundException e) { throw e; @@ -157,6 +157,30 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { } private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) { + RoutingType explicitRoutingType = getExplicitRoutingType(symbols); + if (explicitRoutingType != null) { + return explicitRoutingType; + } else { + final AddressInfo addressInfo = sessionSPI.getAddress(address); + /* + * If we're dealing with an *existing* address that has just one routing-type simply use that. + * This allows "bare" AMQP clients (which have no built-in routing semantics) to send messages + * wherever they want in this case because the routing ambiguity is eliminated. + */ + if (addressInfo != null && addressInfo.getRoutingTypes().size() == 1) { + return addressInfo.getRoutingType(); + } else { + return getDefaultRoutingType(address); + } + } + } + + private RoutingType getDefaultRoutingType(SimpleString address) { + RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address); + return defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType; + } + + private RoutingType getExplicitRoutingType(Symbol[] symbols) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -166,15 +190,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver { } } } - final AddressInfo addressInfo = sessionSPI.getAddress(address); - if (addressInfo != null && !addressInfo.getRoutingTypes().isEmpty()) { - if (addressInfo.getRoutingTypes().size() == 1 && addressInfo.getRoutingType() == RoutingType.MULTICAST) { - return RoutingType.MULTICAST; - } - } - RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address); - defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType; - return defaultRoutingType; + return null; } @Override diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index 9b0b77f2cb..a0389be865 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -71,7 +71,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch()); + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), true, true); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 4631373e05..4e1934b659 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -16,7 +16,15 @@ */ package org.apache.activemq.artemis.core.protocol.openwire; +import javax.jms.IllegalStateException; +import javax.jms.InvalidClientIDException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSSecurityException; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -29,23 +37,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import javax.jms.IllegalStateException; -import javax.jms.InvalidClientIDException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSSecurityException; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -64,19 +63,14 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.SecurityAuth; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.AddressQueryResult; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.TempQueueObserver; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RefsOperation; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; @@ -136,7 +130,6 @@ import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * Represents an activemq connection. @@ -906,29 +899,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName()); - if (server.locateQueue(qName) == null) { - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName()); - if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) { - try { - internalSession.createQueue(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST).setDurable(!dest.isTemporary()).setTemporary(dest.isTemporary()).setAutoCreated(!dest.isTemporary())); - created = true; - } catch (ActiveMQQueueExistsException exists) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. - } - } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) { - try { - AddressInfo addressInfo = new AddressInfo(qName, RoutingType.MULTICAST); - if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) { - addressInfo.setInternal(true); - } - if (internalSession.getAddress(addressInfo.getName()) == null) { - internalSession.createAddress(addressInfo, !dest.isTemporary()); - created = true; - } - } catch (ActiveMQAddressExistsException exists) { - // The address may have been created by another thread in the mean time. Catch and do nothing. - } + + AutoCreateResult autoCreateResult = internalSession.checkAutoCreate(new QueueConfiguration(qName) + .setRoutingType(dest.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST) + .setDurable(!dest.isTemporary()) + .setTemporary(dest.isTemporary())); + if (autoCreateResult == AutoCreateResult.CREATED) { + created = true; + if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) { + internalSession.getAddress(qName).setInternal(true); } + } else if (autoCreateResult == AutoCreateResult.NOT_FOUND) { + throw new InvalidDestinationException(dest.getDestinationTypeAsString() + " " + dest.getPhysicalName() + " does not exist."); } if (dest.isTemporary()) { @@ -1172,23 +1154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - /** - * Checks to see if this destination exists. If it does not throw an invalid destination exception. - * - * @param destination - */ - private void validateDestination(ActiveMQDestination destination) throws Exception { - if (destination.isQueue()) { - SimpleString physicalName = new SimpleString(destination.getPhysicalName()); - QueueQueryResult queue = server.queueQuery(physicalName); - AddressQueryResult address = server.addressQuery(physicalName); - - if (!address.isExists() && !queue.isAutoCreateQueues()) { - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); - } - } - } - private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) { for (ConsumerState consumerState : sessionState.getConsumerStates()) { consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId); @@ -1270,11 +1235,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se ActiveMQDestination destination = info.getDestination(); if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (destination.isQueue()) { - OpenWireConnection.this.validateDestination(destination); - } - DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); - OpenWireConnection.this.addDestination(destInfo); + OpenWireConnection.this.addDestination(new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination)); } ss.addProducer(info); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 966db041e1..9acfd5dd7a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,25 +16,23 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; - +import javax.jms.InvalidDestinationException; +import javax.jms.ResourceAllocationException; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.InvalidDestinationException; -import javax.jms.ResourceAllocationException; - import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; @@ -42,7 +40,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; @@ -66,7 +63,8 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; + +import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; public class AMQSession implements SessionCallback { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -230,39 +228,26 @@ public class AMQSession implements SessionCallback { boolean hasQueue = true; if (!connection.containsKnownDestination(queueName)) { - QueueQueryResult queueQuery = server.queueQuery(queueName); - - try { - if (!queueQuery.isExists()) { - if (queueQuery.isAutoCreateQueues()) { - SimpleString queueNameToUse = queueName; - SimpleString addressToUse = queueName; - RoutingType routingTypeToUse = RoutingType.ANYCAST; - if (CompositeAddress.isFullyQualified(queueName.toString())) { - addressToUse = CompositeAddress.extractAddressName(queueName); - queueNameToUse = CompositeAddress.extractQueueName(queueName); - AddressInfo addressInfo = server.getAddressInfo(addressToUse); - if (addressInfo != null) { - routingTypeToUse = addressInfo.getRoutingType(); - } else { - AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString()); - routingTypeToUse = as.getDefaultAddressRoutingType(); - } - } - coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter)); - connection.addKnownDestination(queueName); - } else { - if (server.getAddressInfo(queueName) == null) { - //Address does not exist and will not get autocreated - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); - } - hasQueue = false; - } + RoutingType routingTypeToUse = RoutingType.ANYCAST; + if (CompositeAddress.isFullyQualified(queueName.toString())) { + SimpleString addressToUse = CompositeAddress.extractAddressName(queueName); + AddressInfo addressInfo = server.getAddressInfo(addressToUse); + if (addressInfo != null) { + routingTypeToUse = addressInfo.getRoutingType(); + } else { + AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString()); + routingTypeToUse = as.getDefaultAddressRoutingType(); } - } catch (ActiveMQQueueExistsException e) { - // In case another thread created the queue before us but after we did the binding query - hasQueue = true; } + AutoCreateResult autoCreateResult = coreSession.checkAutoCreate(new QueueConfiguration(queueName) + .setAddress(queueName) + .setRoutingType(routingTypeToUse) + .setTemporary(isTemporary) + .setFilterString(filter)); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); + } + connection.addKnownDestination(queueName); } return hasQueue; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6bf72f9948..2b397434e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -1252,14 +1253,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception { AddressInfo addressInfo = addressManager.getAddressInfo(address); if (addressInfo == null && context.getServerSession() != null) { - if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { - addressInfo = addressManager.getAddressInfo(address); - } else { + AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType())); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); if (context.getTransaction() != null) { context.getTransaction().markAsRollbackOnly(ex); } throw ex; + } else { + addressInfo = addressManager.getAddressInfo(address); } } return addressInfo; @@ -1268,7 +1270,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(address); if (bindings == null && context.getServerSession() != null) { - if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { + AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType())); + if (autoCreateResult == AutoCreateResult.NOT_FOUND) { ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); Transaction tx = context.getTransaction(); if (tx != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 03ac2a8d5a..c570359e74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -470,7 +471,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } - if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5)) { + response = new SessionBindingQueryResponseMessage_V5(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch(), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.MULTICAST), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.ANYCAST)); + } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index e1f88157b3..d02851ee8f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.json.JsonArrayBuilder; import javax.transaction.xa.Xid; import java.util.Collection; @@ -111,7 +112,7 @@ public interface ServerSession extends SecurityAuth { void addCloseable(Closeable closeable); - boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception; + AutoCreateResult checkAutoCreate(QueueConfiguration queueConfiguration) throws Exception; ServerConsumer createConsumer(long consumerID, SimpleString queueName, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 99bcee9746..77ca360486 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,15 +16,10 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.core.management.impl.view.ProducerField; -import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.json.JsonArrayBuilder; -import org.apache.activemq.artemis.json.JsonObjectBuilder; -import java.security.cert.X509Certificate; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; +import java.lang.invoke.MethodHandles; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,10 +34,13 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.AutoCreateResult; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -54,6 +52,7 @@ import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.management.impl.view.ProducerField; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -61,6 +60,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -94,6 +94,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction.State; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.json.JsonArrayBuilder; +import org.apache.activemq.artemis.json.JsonObjectBuilder; import org.apache.activemq.artemis.json.JsonValue; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -105,7 +107,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * Server side Session implementation @@ -1746,49 +1747,83 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return tx; } - @Override - public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception { - boolean result; - SimpleString unPrefixedAddress = removePrefix(address); + public AutoCreateResult checkAutoCreate(final QueueConfiguration queueConfig) throws Exception { + AutoCreateResult result; + SimpleString unPrefixedAddress = removePrefix(queueConfig.getAddress()); + SimpleString unPrefixedQueue = removePrefix(queueConfig.getName()); AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString()); - if (routingType == RoutingType.MULTICAST) { - if (server.getAddressInfo(unPrefixedAddress) == null) { - if (addressSettings.isAutoCreateAddresses()) { - try { - createAddress(address, routingType, true); - } catch (ActiveMQAddressExistsException e) { - // The address may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } else { - result = false; + /* + * This is only here to maintain backwards compatibility with the previous implementation. + * + * TODO: figure out how to get rid of this + */ + if (queueConfig.getRoutingType() == null) { + return AutoCreateResult.EXISTED; + } + + // No matter what routing-type is used the address must exist already or be automatically created. + AddressInfo addressInfo = server.getAddressInfo(unPrefixedAddress); + if (addressInfo == null) { + // The address doesn't exist. + if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) { + // Try to create the address if possible. + try { + createAddress(queueConfig.getAddress(), queueConfig.getRoutingType(), true).setTemporary(queueConfig.isTemporary()); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean time. Catch and do nothing. } + result = AutoCreateResult.CREATED; } else { - result = true; - } - } else if (routingType == RoutingType.ANYCAST) { - if (server.locateQueue(unPrefixedAddress) == null) { - Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address); - if (bindings != null) { - // this means the address has another queue with a different name, which is fine, we just ignore it on this case - result = true; - } else if (addressSettings.isAutoCreateQueues()) { - try { - createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); - } catch (ActiveMQQueueExistsException e) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } else { - result = false; - } - } else { - result = true; + // If the address doesn't exist and can't be autocreated then return that result immediately. + return AutoCreateResult.NOT_FOUND; } } else { - result = true; + // The address exists. + if (addressInfo.getRoutingTypes().contains(queueConfig.getRoutingType())) { + // The existing address supports the requested routing-type. + result = AutoCreateResult.EXISTED; + } else { + // The existing address doesn't support the requested routing-type. + if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) { + // Try to update the address with the new routing-type if possible. + try { + createAddress(addressInfo.addRoutingType(queueConfig.getRoutingType()), true); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean-time. Catch and do nothing. + } + result = AutoCreateResult.UPDATED; + } else { + // If the address exists but doesn't support the requested routing-type and can't be updated with the new routing-type then return that result immediately. + return AutoCreateResult.NOT_FOUND; + } + } + } + + if (queueConfig.getRoutingType() == RoutingType.ANYCAST || queueConfig.isFqqn()) { + if (server.locateQueue(unPrefixedQueue) == null) { + // The queue doesn't exist. + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress); + if (bindings != null && !queueConfig.isFqqn()) { + // The address has another queue with a different name, which is fine. Just ignore it. + result = AutoCreateResult.EXISTED; + } else if (addressSettings.isAutoCreateQueues() || queueConfig.isTemporary()) { + // Try to create the queue. + try { + createQueue(new QueueConfiguration(queueConfig).setAutoCreated(true)); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean-time. Catch and do nothing. + } + result = AutoCreateResult.CREATED; + } else { + // The queue doesn't exist, and we can't auto-create it so return that result immediately. + return AutoCreateResult.NOT_FOUND; + } + } else { + // The queue exists. + result = AutoCreateResult.EXISTED; + } } return result; diff --git a/pom.xml b/pom.xml index ccc9151f27..5f5415e3ec 100644 --- a/pom.xml +++ b/pom.xml @@ -182,7 +182,7 @@ 1 0 0 - 134,133,132,131,130,129,128,127,126,125,124,123,122 + 135,134,133,132,131,130,129,128,127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 0de24670fc..7eb4c5221a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -166,8 +166,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport { Set acceptors = server.getConfiguration().getAcceptorConfigurations(); for (TransportConfiguration tc : acceptors) { if (tc.getName().equals(NETTY_ACCEPTOR)) { - tc.getExtraParams().put("anycastPrefix", "anycast://"); - tc.getExtraParams().put("multicastPrefix", "multicast://"); + tc.getExtraParams().put("anycastPrefix", ANYCAST_PREFIX); + tc.getExtraParams().put("multicastPrefix", MULTICAST_PREFIX); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java index 2f0355850c..ce1007af1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverReconnectWithMulticastPrefixTest.java @@ -72,8 +72,8 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("anycastPrefix", "anycast://"); - params.put("multicastPrefix", "multicast://"); + params.put("anycastPrefix", ANYCAST_PREFIX); + params.put("multicastPrefix", MULTICAST_PREFIX); } @Override @@ -96,7 +96,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Test(timeout = 60000) public void testReattachToDurableNodeAndTryAndReceiveNewlySentMessage() throws Exception { final String addressName = "test-address"; - final String prefixedName = "multicast://" + addressName; + final String prefixedName = MULTICAST_PREFIX + addressName; AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.createConnection()); @@ -143,7 +143,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie @Test(timeout = 60000) public void testReattachToDurableNodeAndTryAndReceivePreviouslySentMessage() throws Exception { final String addressName = "test-address"; - final String prefixedName = "multicast://" + addressName; + final String prefixedName = MULTICAST_PREFIX + addressName; AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.createConnection()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java index 89f3ebce79..a052dddd09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java @@ -62,7 +62,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).toJSON()); serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); - sendMessages("anycast://" + addressA, 1); + sendMessages(ANYCAST_PREFIX + addressA, 1); Wait.assertEquals(1, () -> (server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount())); Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC))::getMessageCount); @@ -100,7 +100,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON()); - sendMessages("multicast://" + addressA, 1); + sendMessages(MULTICAST_PREFIX + addressA, 1); Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA))::getMessageCount); Wait.assertEquals(2, () -> (server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java index fe9e9e0d4e..111eeac9ad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java @@ -302,7 +302,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); - assertTrue(expectedException.getMessage().contains("target address does not exist")); + assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist")); } finally { connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index e4e2648657..bac1a46bbb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -185,6 +185,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableTrue() throws Exception { + assertNotNull(server.locateQueue(getQueueName())); sendMessages(getQueueName(), 1, true); AmqpClient client = createAmqpClient(); @@ -1115,7 +1116,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); - assertTrue(expectedException.getMessage().contains("target address does not exist")); + assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist")); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index d7605620e8..c06a2ab0ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -44,6 +44,9 @@ public class AmqpTestSupport extends ActiveMQTestBase { protected static final String BROKER_NAME = "localhost"; protected static final String NETTY_ACCEPTOR = "netty-acceptor"; + protected static final String MULTICAST_PREFIX = "multicast://"; + protected static final String ANYCAST_PREFIX = "anycast://"; + protected String noprivUser = "noprivs"; protected String noprivPass = "noprivs"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java index bc66518022..d8394cd381 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java @@ -74,8 +74,8 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("anycastPrefix", "anycast://"); - params.put("multicastPrefix", "multicast://"); + params.put("anycastPrefix", ANYCAST_PREFIX); + params.put("multicastPrefix", MULTICAST_PREFIX); } @Override @@ -272,9 +272,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport final String prefixedName; if (routingType == RoutingType.ANYCAST) { - prefixedName = "anycast://" + addressName; + prefixedName = ANYCAST_PREFIX + addressName; } else { - prefixedName = "multicast://" + addressName; + prefixedName = MULTICAST_PREFIX + addressName; } AmqpSender sender = session.createSender(prefixedName); @@ -320,9 +320,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport final String prefixedName; if (routingType == RoutingType.ANYCAST) { - prefixedName = "anycast://" + addressName; + prefixedName = ANYCAST_PREFIX + addressName; } else { - prefixedName = "multicast://" + addressName; + prefixedName = MULTICAST_PREFIX + addressName; } final AmqpReceiver receiver = session.createReceiver(prefixedName); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java index 53004477f6..6317673c27 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -40,6 +40,16 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { SimpleString queue1 = new SimpleString("queue1"); SimpleString queue2 = new SimpleString("queue2"); + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + @Test(timeout = 60000) public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); @@ -64,6 +74,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testConsumeWhenOnlyAnycast() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(address).setAddress(address).setRoutingType(RoutingType.ANYCAST)); sendMessages(address.toString(), 1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java index 8b4bbcf4d8..802df9c095 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java @@ -124,7 +124,7 @@ public class QueueAutoCreationTest extends JMSClientTestSupport { Connection connection = factory.createConnection(); SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID(); logger.debug("Address is {}", addressName); - clientSession.createAddress(addressName, RoutingType.ANYCAST, false); + clientSession.createAddress(addressName, RoutingType.MULTICAST, false); Topic topic = new ActiveMQTopic(addressName.toString()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(topic); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index de7fcb19cd..50c092e960 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -389,7 +389,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { Connection connection = factory.createConnection(); SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID(); logger.debug("Address is {}", addressName); - clientSession.createAddress(addressName, RoutingType.ANYCAST, false); + clientSession.createAddress(addressName, RoutingType.MULTICAST, false); Topic topic = new ActiveMQTopic(addressName.toString()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(topic); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java new file mode 100644 index 0000000000..f962e973b6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMismatchedRoutingTypeTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Test; + +public class JMSMismatchedRoutingTypeTest extends MultiprotocolJMSClientTestSupport { + + protected final String ANYCAST_ADDRESS = RandomUtil.randomString(); + protected final String MULTICAST_ADDRESS = RandomUtil.randomString(); + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(ANYCAST_ADDRESS), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(ANYCAST_ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(MULTICAST_ADDRESS), RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(MULTICAST_ADDRESS).setRoutingType(RoutingType.MULTICAST)); + } + + @Test + public void testSendingMulticastToAnycastAMQP() throws Exception { + internalTestSendingMulticastToAnycast(AMQPConnection); + } + + @Test + public void testSendingMulticastToAnycastCore() throws Exception { + internalTestSendingMulticastToAnycast(CoreConnection); + } + + @Test + public void testSendingMulticastToAnycastOpenWire() throws Exception { + internalTestSendingMulticastToAnycast(OpenWireConnection); + } + + private void internalTestSendingMulticastToAnycast(ConnectionSupplier connectionSupplier) throws Exception { + Connection connection = null; + try { + connection = connectionSupplier.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = s.createTopic(ANYCAST_ADDRESS); + MessageProducer producer = s.createProducer(topic); + producer.send(s.createMessage()); + fail("Sending message here should fail!"); + } catch (InvalidDestinationException e) { + // expected + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void testSendingAnycastToMulticastAMQP() throws Exception { + internalTestSendingAnycastToMulticast(AMQPConnection); + } + + @Test + public void testSendingAnycastToMulticastCore() throws Exception { + internalTestSendingAnycastToMulticast(CoreConnection); + } + + @Test + public void testSendingAnycastToMulticastOpenWire() throws Exception { + internalTestSendingAnycastToMulticast(OpenWireConnection); + } + + private void internalTestSendingAnycastToMulticast(ConnectionSupplier connectionSupplier) throws Exception { + Connection connection = null; + try { + connection = connectionSupplier.createConnection(); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q; + try { + // when using core this will fail because the client actually checks for the queue on the broker (which won't be there) + q = s.createQueue(MULTICAST_ADDRESS); + } catch (JMSException e) { + if (connection instanceof ActiveMQConnection) { + // expected + return; + } else { + throw e; + } + } + try { + MessageProducer producer = s.createProducer(q); + producer.send(s.createMessage()); + fail("Sending message here should fail!"); + } catch (InvalidDestinationException e) { + // expected + } + } finally { + if (connection != null) { + connection.close(); + } + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java index 30a9287ea9..2507971d7e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java @@ -378,6 +378,12 @@ public class SecurityTest extends ActiveMQTestBase { try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //Add the corresponding addresses to the server + SimpleString address = SimpleString.toSimpleString("test.queue"); + server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + SimpleString address2 = SimpleString.toSimpleString("test.topic"); + server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST)); + //Test queue creation permission try { session.createConsumer(session.createQueue("test.queue")); @@ -394,14 +400,9 @@ public class SecurityTest extends ActiveMQTestBase { assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'")); } - //Add a test queue and topic to the server - SimpleString address = SimpleString.toSimpleString("test.queue"); - server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + //Add a test queue to the server server.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); - SimpleString address2 = SimpleString.toSimpleString("test.topic"); - server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST)); - //Test queue produce permission try { MessageProducer producer = session.createProducer(session.createQueue("test.queue")); @@ -441,7 +442,7 @@ public class SecurityTest extends ActiveMQTestBase { session.createTemporaryQueue(); Assert.fail("should throw exception here"); } catch (Exception e) { - assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'")); + assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_ADDRESS'")); } //Test temp topic diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java index a493104a34..d667b3fd30 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java @@ -299,7 +299,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase { final int COUNT = 10; final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); - server.addAddressInfo(new AddressInfo(addressName)); + server.addAddressInfo(new AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST)); ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0"); Connection c = cf.createConnection(); diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml index a055523c80..b76280f152 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml index fbb08005a0..2d8ae6dd75 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml index 220fee04eb..e474294e55 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml index 1b3bf2c858..5bcbe0f057 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml @@ -48,9 +48,15 @@ under the License. -
-
-
+
+ +
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml index c8a0054be2..6b7167871e 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml index c8a0054be2..6b7167871e 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml index e748824490..b2d3a7b504 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml index 78519664f2..4659258160 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +
diff --git a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml index df5c5a9836..b26b6d4f4f 100644 --- a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml +++ b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml @@ -48,8 +48,12 @@ under the License. -
-
+
+ +
+
+ +