From ca66028b2a596a4ab5a61ec57633fed3e7b85b22 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 23 Jan 2023 12:38:16 -0500 Subject: [PATCH] ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating an address When an AMQP client subscribes to a new address (non-existing) with a receiver link, the address is created with routing type ANYCAST regardles of the default address creation configuration of the broker, and ignores even the broker wide default of MULTICAST. --- .../proton/ProtonServerSenderContext.java | 12 +- ...AutoCreateWithDefaultRoutingTypesTest.java | 205 ++++++++++++++++++ .../BrokerDefinedAnycastConsumerTest.java | 1 + .../ClientDefinedAnycastConsumerTest.java | 42 +++- 4 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 98d62c46cb..40af705c38 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; @@ -960,7 +962,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr SimpleString tempQueueName; String selector; - private final RoutingType defaultRoutingType = RoutingType.ANYCAST; private RoutingType routingTypeToUse = RoutingType.ANYCAST; private boolean isVolatile = false; @@ -1110,8 +1111,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { // if not we look up the address AddressQueryResult addressQueryResult = null; + + // Set this to the broker configured default for the address prior to the lookup so that + // an auto create will actually use the configured defaults. The actual query result will + // contain the true answer on what routing type the address actually has though. + final RoutingType routingType = sessionSPI.getDefaultRoutingType(addressToUse); + routingTypeToUse = routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType; + try { - addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true); + addressQueryResult = sessionSPI.addressQuery(addressToUse, routingTypeToUse, true); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); } catch (ActiveMQAMQPException e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java new file mode 100644 index 0000000000..d62293233e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.AddressQueryResult; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Parameterized.Parameters(name = "routingType={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {RoutingType.ANYCAST}, {RoutingType.MULTICAST} + }); + } + + @Parameterized.Parameter(0) + public RoutingType routingType; + + @Override + protected String getConfiguredProtocols() { + return "AMQP"; + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + // Don't create anything by default since we are testing auto create + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + Configuration serverConfig = server.getConfiguration(); + serverConfig.setJournalType(JournalType.NIO); + Map map = serverConfig.getAddressSettings(); + if (map.size() == 0) { + AddressSettings as = new AddressSettings(); + map.put("#", as); + } + Map.Entry entry = map.entrySet().iterator().next(); + AddressSettings settings = entry.getValue(); + settings.setAutoCreateQueues(true); + settings.setDefaultAddressRoutingType(routingType); + settings.setDefaultQueueRoutingType(routingType); + logger.info("server config, isauto? {}", entry.getValue().isAutoCreateQueues()); + logger.info("server config, default queue routing type? {}", entry.getValue().getDefaultQueueRoutingType()); + logger.info("server config, default address routing type? {}", entry.getValue().getDefaultAddressRoutingType()); + } + + @Test(timeout = 30_000) + public void testCreateSender() throws Exception { + final String addressName = "sender-address"; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(addressName); + + AddressQueryResult address = getProxyToAddress(addressName); + + assertNotNull(address); + assertEquals(Set.of(routingType), address.getRoutingTypes()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 30_000) + public void testCreateReceiver() throws Exception { + final String addressName = "receiver-address"; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(addressName); + + AddressQueryResult address = getProxyToAddress(addressName); + + assertNotNull(address); + assertEquals(Set.of(routingType), address.getRoutingTypes()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 30_000) + public void testCreateSenderThatRequestsMultiCast() throws Exception { + dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.MULTICAST); + } + + @Test(timeout = 30_000) + public void testCreateSenderThatRequestsAnyCast() throws Exception { + dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.ANYCAST); + } + + private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception { + final String addressName = "sender-defined-address"; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Target target = new Target(); + target.setAddress(addressName); + if (routingType == RoutingType.ANYCAST) { + target.setCapabilities(QUEUE_CAPABILITY); + } else { + target.setCapabilities(TOPIC_CAPABILITY); + } + + AmqpSender sender = session.createSender(target); + + AddressQueryResult address = getProxyToAddress(addressName); + + assertNotNull(address); + assertEquals(Set.of(routingType), address.getRoutingTypes()); + + sender.close(); + connection.close(); + } + + @Test(timeout = 30_000) + public void testCreateReceiverThatRequestsMultiCast() throws Exception { + dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.MULTICAST); + } + + @Test(timeout = 30_000) + public void testCreateReceiverThatRequestsAnyCast() throws Exception { + dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.ANYCAST); + } + + private void dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception { + final String addressName = "receiver-defined-address"; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Source source = new Source(); + source.setAddress(addressName); + if (routingType == RoutingType.ANYCAST) { + source.setCapabilities(QUEUE_CAPABILITY); + } else { + source.setCapabilities(TOPIC_CAPABILITY); + } + + AmqpReceiver receiver = session.createReceiver(source); + + AddressQueryResult address = getProxyToAddress(addressName); + + assertNotNull(address); + assertEquals(Set.of(routingType), address.getRoutingTypes()); + + receiver.close(); + connection.close(); + } + + public AddressQueryResult getProxyToAddress(String addressName) throws Exception { + return server.addressQuery(SimpleString.toSimpleString(addressName)); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java index 00b48a19f4..927ec3ec5d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -197,6 +197,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.getAddressSettingsRepository().clear(); AddressSettings settings = new AddressSettings(); settings.setAutoCreateAddresses(true); + settings.setDefaultAddressRoutingType(RoutingType.ANYCAST); server.getAddressSettingsRepository().addMatch(address.toString(), settings); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java index 3e504d7895..66c4b106b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java @@ -16,15 +16,23 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; import org.junit.Test; public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { @@ -33,12 +41,15 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { - AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(address.toString()); + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(QUEUE_CAPABILITY); + + AmqpReceiver receiver = session.createReceiver(source); sendMessages(address.toString(), 1); receiver.flow(1); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); @@ -48,4 +59,31 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameNameNegativeValidation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY); + + AmqpReceiver receiver = session.createReceiver(source); + sendMessages(address.toString(), 1); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + Bindings bindings = server.getPostOffice().getBindingsForAddress(address); + assertEquals(1, bindings.getBindings().size()); + bindings.getBindings().forEach((binding) -> { + final Queue localQueue = ((LocalQueueBinding) binding).getQueue(); + assertEquals(1, localQueue.getConsumerCount()); + assertEquals(RoutingType.MULTICAST, localQueue.getRoutingType()); + }); + + receiver.close(); + connection.close(); + } }