From 753dac47d8f407f94f672718bb416c6f5385b0de Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 28 Oct 2020 17:34:28 -0400 Subject: [PATCH 1/3] ARTEMIS-2937 Cleanup on tests --- .../activemq/artemis/utils/collections/LinkedListImpl.java | 5 ++--- .../tests/integration/amqp/connect/AMQPReplicaTest.java | 1 - .../activemq/artemis/tests/unit/util/LinkedListTest.java | 7 +++++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 1569508a19..f9a3940db5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -131,9 +131,8 @@ public class LinkedListImpl implements LinkedList { return null; } - if (node.prev != null) { - removeAfter(node.prev); - } + // the node will always have a prev element + removeAfter(node.prev); return node.val(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 219eaf7fbe..eb5b202854 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -668,7 +668,6 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { for (int i = START_ID; i <= LAST_ID; i++) { Message message = consumer.receive(3000); Assert.assertNotNull(message); - System.out.println("port " + port + ",i::" + message.getIntProperty("i")); Assert.assertEquals(i, message.getIntProperty("i")); if (message instanceof TextMessage) { Assert.assertEquals(getText(largeMessage, i), ((TextMessage) message).getText()); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index 7d9e27fd2c..cd22712870 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -202,6 +202,13 @@ public class LinkedListTest extends ActiveMQTestBase { if (deferSupplier) { Assert.assertEquals(0, objs.getSizeOfSuppliedIDs()); objs.setIDSupplier(source -> source.id); + } else { + // clear the ID supplier + objs.clearID(); + // and redo it + Assert.assertEquals(0, objs.getSizeOfSuppliedIDs()); + objs.setIDSupplier(source -> source.id); + Assert.assertEquals(1000, objs.size()); } Assert.assertEquals(1000, objs.getSizeOfSuppliedIDs()); From 9335cd83ed0b2bdc12d72d24e88eb67cf3ddf7d1 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 29 Oct 2020 08:48:16 -0400 Subject: [PATCH 2/3] ARTEMIS-2937 Changing BrokerConnection SSL example to also use SSL on the client --- .../features/broker-connection/amqp-sending-overssl/pom.xml | 2 +- .../artemis/jms/example/BrokerConnectionSenderSSL.java | 6 +++--- .../src/main/resources/activemq/server0/broker.xml | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/features/broker-connection/amqp-sending-overssl/pom.xml b/examples/features/broker-connection/amqp-sending-overssl/pom.xml index c1dc3b83ca..6b0d6448b0 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/pom.xml +++ b/examples/features/broker-connection/amqp-sending-overssl/pom.xml @@ -102,7 +102,7 @@ under the License. true ${noServer} ${basedir}/target/server0 - tcp://localhost:5660 + tcp://localhost:5671 run diff --git a/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java b/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java index bc0ab978f0..f7145ae847 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java +++ b/examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java @@ -35,7 +35,7 @@ public class BrokerConnectionSenderSSL { public static void main(final String[] args) throws Exception { Connection connectionOnServer0 = null; - ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5660"); + ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqps://localhost:5672?transport.trustStoreLocation=target/server0/etc/activemq.example.truststore&transport.trustStorePassword=activemqexample&transport.verifyHost=false"); // Step 1. Create a connection on server0, and send a few messages try { @@ -50,7 +50,7 @@ public class BrokerConnectionSenderSSL { sender.send(session.createTextMessage("Hello world overSSL n" + i)); } } finally { - if (connectionFactoryServer0 != null) { + if (connectionOnServer0 != null) { connectionOnServer0.close(); } } @@ -58,7 +58,7 @@ public class BrokerConnectionSenderSSL { // Step 2. create a connection on server1, and receive a few messages. // the sender on the broker conneciton will take care of the transfer. Connection connectionOnServer1 = null; - ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5771"); + ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqps://localhost:5772?transport.trustStoreLocation=target/server1/etc/activemq.example.truststore&transport.trustStorePassword=activemqexample&transport.verifyHost=false"); try { connectionOnServer1 = connectionFactoryServer1.createConnection(); diff --git a/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml index 8aea83cf33..3ecd7dcc88 100644 --- a/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/broker-connection/amqp-sending-overssl/src/main/resources/activemq/server0/broker.xml @@ -31,7 +31,8 @@ under the License. - tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + tcp://0.0.0.0:5671?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + tcp://localhost:5672?sslEnabled=true;keyStorePath=activemq.example.keystore;keyStorePassword=activemqexample;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE From d51c89471eb07ea21feffb6afb5edc58abbcfc6b Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 28 Oct 2020 12:05:32 -0400 Subject: [PATCH 3/3] ARTEMIS-2966 Anycast queues with distinct names would cause issues on sending messages --- .../amqp/broker/AMQPSessionCallback.java | 7 +- .../amqp/AmqpAnyCastDistinctQueueTest.java | 75 +++++++++++++++++++ .../amqp/AmqpFullyQualifiedNameTest.java | 8 +- .../amqp/connect/QpidDispatchPeerTest.java | 30 +++++--- 4 files changed, 108 insertions(+), 12 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index cc5616a358..9d165d7dcf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.AddressQueryResult; @@ -346,7 +347,11 @@ public class AMQPSessionCallback implements SessionCallback { } } else if (routingType == RoutingType.ANYCAST) { if (manager.getServer().locateQueue(unPrefixedAddress) == null) { - if (addressSettings.isAutoCreateQueues()) { + Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address); + if (bindings != null) { + // this means the address has another queue with a different name, which is fine, we just ignore it on this case + result = true; + } else if (addressSettings.isAutoCreateQueues()) { try { serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); } catch (ActiveMQQueueExistsException e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java new file mode 100644 index 0000000000..dd08bd1f42 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.CFUtil; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Make sure auto create will not create a queue when the queue and its address have a distinct name in anycast. + */ +public class AmqpAnyCastDistinctQueueTest extends AmqpClientTestSupport { + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + } + + @Test(timeout = 60000) + public void testDistinctQueueAddressAnyCast() throws Exception { + String ADDRESS_NAME = "DISTINCT_ADDRESS_testDistinctAddressAnyCast"; + String QUEUE_NAME = "DISTINCT_QUEUE_testDistinctQUEUE_AnyCast"; + server.addAddressInfo(new AddressInfo(ADDRESS_NAME).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(ADDRESS_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + final int NUMBER_OF_MESSAGES = 100; + + ConnectionFactory jmsCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = jmsCF.createConnection()) { + Session session = connection.createSession(); + Queue queueSending = session.createQueue(ADDRESS_NAME); + MessageProducer producer = session.createProducer(queueSending); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + } + + Queue queueReceiving = session.createQueue(QUEUE_NAME); + connection.start(); + MessageConsumer consumer = session.createConsumer(queueReceiving); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello " + i, message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + } + + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java index ba1a3aa9cd..3dea97517f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -278,7 +279,12 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { @Test public void testQueue() throws Exception { - server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true)); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false)); + + server.addAddressInfo(new AddressInfo(anycastAddress).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server.createQueue(new QueueConfiguration(anycastQ1).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + server.createQueue(new QueueConfiguration(anycastQ2).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + server.createQueue(new QueueConfiguration(anycastQ3).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); Connection connection = createConnection(); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java index 100e1c957b..507b3d3591 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java @@ -81,17 +81,22 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { qpidProcess.kill(); } + @Test(timeout = 60_000) + public void testWithMatchingDifferentNamesOnQueue() throws Exception { + internalMultipleQueues(true, true); + } + @Test(timeout = 60_000) public void testWithMatching() throws Exception { - internalMultipleQueues(true); + internalMultipleQueues(true, false); } @Test(timeout = 60_000) public void testwithQueueName() throws Exception { - internalMultipleQueues(false); + internalMultipleQueues(false, true); } - private void internalMultipleQueues(boolean useMatching) throws Exception { + private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception { final int numberOfMessages = 100; final int numberOfQueues = 10; AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24621").setRetryInterval(10).setReconnectAttempts(-1); @@ -99,14 +104,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER)); } else { for (int i = 0; i < numberOfQueues; i++) { - amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName("queue.test" + i).setType(AMQPBrokerConnectionAddressType.PEER)); + amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName(createQueueName(i, distinctNaming)).setType(AMQPBrokerConnectionAddressType.PEER)); } } server.getConfiguration().addAMQPConnection(amqpConnection); server.start(); for (int i = 0; i < numberOfQueues; i++) { server.addAddressInfo(new AddressInfo("queue.test" + i).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(false)); - server.createQueue(new QueueConfiguration("queue.test" + i).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(createQueueName(i, distinctNaming)).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST)); } for (int dest = 0; dest < numberOfQueues; dest++) { @@ -120,7 +125,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest); + org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); for (int i = 0; i < numberOfMessages; i++) { producer.send(session.createTextMessage("hello " + i)); @@ -130,8 +135,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { connection.close(); } - System.out.println("*******************************************************************************************************************************"); - System.out.println("Creating consumer"); for (int dest = 0; dest < numberOfQueues; dest++) { ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); @@ -151,7 +154,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { System.out.println("*******************************************************************************************************************************"); } Assert.assertNotNull(received); - System.out.println("message " + received.getText()); Assert.assertEquals("hello " + i, received.getText()); } Assert.assertNull(consumer.receiveNoWait()); @@ -162,12 +164,20 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { } } - org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest); + org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); Wait.assertEquals(0, testQueueOnServer::getMessageCount); } } + private String createQueueName(int i, boolean useDistinctName) { + if (useDistinctName) { + return "distinct.test" + i; + } else { + return "queue.test" + i; + } + } + private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer, Connection connection) throws InterruptedException { for (int i = 0; i < 100; i++) {