From d51c89471eb07ea21feffb6afb5edc58abbcfc6b Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 28 Oct 2020 12:05:32 -0400 Subject: [PATCH] ARTEMIS-2966 Anycast queues with distinct names would cause issues on sending messages --- .../amqp/broker/AMQPSessionCallback.java | 7 +- .../amqp/AmqpAnyCastDistinctQueueTest.java | 75 +++++++++++++++++++ .../amqp/AmqpFullyQualifiedNameTest.java | 8 +- .../amqp/connect/QpidDispatchPeerTest.java | 30 +++++--- 4 files changed, 108 insertions(+), 12 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index cc5616a358..9d165d7dcf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.AddressQueryResult; @@ -346,7 +347,11 @@ public class AMQPSessionCallback implements SessionCallback { } } else if (routingType == RoutingType.ANYCAST) { if (manager.getServer().locateQueue(unPrefixedAddress) == null) { - if (addressSettings.isAutoCreateQueues()) { + Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address); + if (bindings != null) { + // this means the address has another queue with a different name, which is fine, we just ignore it on this case + result = true; + } else if (addressSettings.isAutoCreateQueues()) { try { serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); } catch (ActiveMQQueueExistsException e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java new file mode 100644 index 0000000000..dd08bd1f42 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.CFUtil; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Make sure auto create will not create a queue when the queue and its address have a distinct name in anycast. + */ +public class AmqpAnyCastDistinctQueueTest extends AmqpClientTestSupport { + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + } + + @Test(timeout = 60000) + public void testDistinctQueueAddressAnyCast() throws Exception { + String ADDRESS_NAME = "DISTINCT_ADDRESS_testDistinctAddressAnyCast"; + String QUEUE_NAME = "DISTINCT_QUEUE_testDistinctQUEUE_AnyCast"; + server.addAddressInfo(new AddressInfo(ADDRESS_NAME).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(ADDRESS_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + final int NUMBER_OF_MESSAGES = 100; + + ConnectionFactory jmsCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = jmsCF.createConnection()) { + Session session = connection.createSession(); + Queue queueSending = session.createQueue(ADDRESS_NAME); + MessageProducer producer = session.createProducer(queueSending); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + } + + Queue queueReceiving = session.createQueue(QUEUE_NAME); + connection.start(); + MessageConsumer consumer = session.createConsumer(queueReceiving); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello " + i, message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + } + + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java index ba1a3aa9cd..3dea97517f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -278,7 +279,12 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { @Test public void testQueue() throws Exception { - server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true)); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false)); + + server.addAddressInfo(new AddressInfo(anycastAddress).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server.createQueue(new QueueConfiguration(anycastQ1).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + server.createQueue(new QueueConfiguration(anycastQ2).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + server.createQueue(new QueueConfiguration(anycastQ3).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); Connection connection = createConnection(); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java index 100e1c957b..507b3d3591 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java @@ -81,17 +81,22 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { qpidProcess.kill(); } + @Test(timeout = 60_000) + public void testWithMatchingDifferentNamesOnQueue() throws Exception { + internalMultipleQueues(true, true); + } + @Test(timeout = 60_000) public void testWithMatching() throws Exception { - internalMultipleQueues(true); + internalMultipleQueues(true, false); } @Test(timeout = 60_000) public void testwithQueueName() throws Exception { - internalMultipleQueues(false); + internalMultipleQueues(false, true); } - private void internalMultipleQueues(boolean useMatching) throws Exception { + private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception { final int numberOfMessages = 100; final int numberOfQueues = 10; AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24621").setRetryInterval(10).setReconnectAttempts(-1); @@ -99,14 +104,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER)); } else { for (int i = 0; i < numberOfQueues; i++) { - amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName("queue.test" + i).setType(AMQPBrokerConnectionAddressType.PEER)); + amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName(createQueueName(i, distinctNaming)).setType(AMQPBrokerConnectionAddressType.PEER)); } } server.getConfiguration().addAMQPConnection(amqpConnection); server.start(); for (int i = 0; i < numberOfQueues; i++) { server.addAddressInfo(new AddressInfo("queue.test" + i).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(false)); - server.createQueue(new QueueConfiguration("queue.test" + i).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(createQueueName(i, distinctNaming)).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST)); } for (int dest = 0; dest < numberOfQueues; dest++) { @@ -120,7 +125,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest); + org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); for (int i = 0; i < numberOfMessages; i++) { producer.send(session.createTextMessage("hello " + i)); @@ -130,8 +135,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { connection.close(); } - System.out.println("*******************************************************************************************************************************"); - System.out.println("Creating consumer"); for (int dest = 0; dest < numberOfQueues; dest++) { ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); @@ -151,7 +154,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { System.out.println("*******************************************************************************************************************************"); } Assert.assertNotNull(received); - System.out.println("message " + received.getText()); Assert.assertEquals("hello " + i, received.getText()); } Assert.assertNull(consumer.receiveNoWait()); @@ -162,12 +164,20 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { } } - org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest); + org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); Wait.assertEquals(0, testQueueOnServer::getMessageCount); } } + private String createQueueName(int i, boolean useDistinctName) { + if (useDistinctName) { + return "distinct.test" + i; + } else { + return "queue.test" + i; + } + } + private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer, Connection connection) throws InterruptedException { for (int i = 0; i < 100; i++) {