diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 76717fde0b..a30bae12b4 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -87,6 +87,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private final String password; private final URI remoteURI; private final String connectionId; + private List desiredCapabilities = Collections.emptyList(); private List offeredCapabilities = Collections.emptyList(); private Map offeredProperties = Collections.emptyMap(); @@ -146,6 +147,9 @@ public class AmqpConnection extends AmqpAbstractResource implements getEndpoint().setContainer(safeGetContainerId()); } getEndpoint().setHostname(remoteURI.getHost()); + if (!getDesiredCapabilities().isEmpty()) { + getEndpoint().setDesiredCapabilities(getDesiredCapabilities().toArray(new Symbol[0])); + } if (!getOfferedCapabilities().isEmpty()) { getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); } @@ -393,12 +397,24 @@ public class AmqpConnection extends AmqpAbstractResource implements this.drainTimeout = drainTimeout; } + public List getDesiredCapabilities() { + return desiredCapabilities; + } + + public void setDesiredCapabilities(List desiredCapabilities) { + if (desiredCapabilities == null) { + desiredCapabilities = Collections.emptyList(); + } + + this.desiredCapabilities = desiredCapabilities; + } + public List getOfferedCapabilities() { return offeredCapabilities; } public void setOfferedCapabilities(List offeredCapabilities) { - if (offeredCapabilities != null) { + if (offeredCapabilities == null) { offeredCapabilities = Collections.emptyList(); } @@ -410,7 +426,7 @@ public class AmqpConnection extends AmqpAbstractResource implements } public void setOfferedProperties(Map offeredProperties) { - if (offeredProperties != null) { + if (offeredProperties == null) { offeredProperties = Collections.emptyMap(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java new file mode 100644 index 0000000000..1743624ffd --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +public class AmqpAnonymousRelayTest extends AmqpClientTestSupport { + + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + + @Test(timeout = 60000) + public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + + message.setAddress(getQueueName()); + message.setMessageId("msg" + 1); + message.setText("Test-Message"); + + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals("msg1", received.getMessageId()); + received.accept(); + + receiver.close(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + 1); + message.setText("Test-Message"); + + try { + sender.send(message); + fail("Should not be able to send, message should be rejected"); + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + sender.close(); + } + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + + message.setAddress("exampleQueu-not-in-service"); + message.setMessageId("msg" + 1); + message.setText("Test-Message"); + + try { + sender.send(message); + fail("Should not be able to send, message should be rejected"); + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + sender.close(); + } + } finally { + connection.close(); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 99ce4dbee5..02b1b99a57 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.tests.integration.amqp; import java.util.Set; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -26,7 +26,6 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; @@ -48,9 +47,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport { protected static Symbol SHARED = Symbol.getSymbol("shared"); protected static Symbol GLOBAL = Symbol.getSymbol("global"); - protected JMSServerManager serverManager; protected ActiveMQServer server; + @Before @Override public void setUp() throws Exception { @@ -84,6 +83,22 @@ public class AmqpClientTestSupport extends AmqpTestSupport { super.tearDown(); } + protected boolean isAutoCreateQueues() { + return true; + } + + protected boolean isAutoCreateAddresses() { + return true; + } + + protected String getDeadLetterAddress() { + return "ActiveMQ.DLQ"; + } + + protected int getPrecreatedQueueSize() { + return 10; + } + protected ActiveMQServer createServer() throws Exception { ActiveMQServer server = createServer(true, true); serverManager = new JMSServerManagerImpl(server); @@ -91,21 +106,30 @@ public class AmqpClientTestSupport extends AmqpTestSupport { // Address 1 CoreAddressConfiguration address = new CoreAddressConfiguration(); - address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); + address.setName(getQueueName()).getRoutingTypes().add(RoutingType.ANYCAST); CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); - queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); + queueConfig.setName(getQueueName()).setAddress(getQueueName()).setRoutingType(RoutingType.ANYCAST); address.getQueueConfigurations().add(queueConfig); serverConfig.addAddressConfiguration(address); - // Address 2 - CoreAddressConfiguration address2 = new CoreAddressConfiguration(); - address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST); - CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration(); - queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST); - address2.getQueueConfigurations().add(queueConfig2); - serverConfig.addAddressConfiguration(address2); + // Address 1....N + for (int i = 0; i < getPrecreatedQueueSize(); ++i) { + CoreAddressConfiguration address2 = new CoreAddressConfiguration(); + address2.setName(getQueueName(i)).getRoutingTypes().add(RoutingType.ANYCAST); + CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration(); + queueConfig2.setName(getQueueName(i)).setAddress(getQueueName(i)).setRoutingType(RoutingType.ANYCAST); + address2.getQueueConfigurations().add(queueConfig2); + serverConfig.addAddressConfiguration(address2); + } - serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); + // Address configuration + AddressSettings addressSettings = new AddressSettings(); + + addressSettings.setAutoCreateQueues(isAutoCreateQueues()); + addressSettings.setAutoCreateAddresses(isAutoCreateQueues()); + addressSettings.setDeadLetterAddress(new SimpleString(getDeadLetterAddress())); + + serverConfig.getAddressesSettings().put("#", addressSettings); serverConfig.setSecurityEnabled(false); Set acceptors = serverConfig.getAcceptorConfigurations(); for (TransportConfiguration tc : acceptors) { @@ -127,8 +151,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport { return getName(); } - public String getTestName2() { - return getName() + "2"; + public String getQueueName() { + return getName(); + } + + public String getQueueName(int index) { + return getName() + "-" + index; } public AmqpClientTestSupport() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java index 93ff22b341..61787dd6f0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java @@ -40,18 +40,18 @@ public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); - message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName()); + message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getQueueName()); sender.send(message); receiver.flow(1); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(1, queue.getMessageCount()); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java index 138f3cc222..4b9213be09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java @@ -51,16 +51,16 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setDescribedType(new AmqpNoLocalFilter()); sender.send(message); sender.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); @@ -77,14 +77,14 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setDescribedType(new AmqpNoLocalFilter()); sender.send(message); sender.close(); connection.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(1, queue.getMessageCount()); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); @@ -111,13 +111,13 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); // Send with AMQP client. - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setDescribedType(new AmqpNoLocalFilter()); sender.send(message); sender.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(1, queue.getMessageCount()); // Receive and resend with OpenWire JMS client @@ -142,7 +142,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); // Now lets receive it with AMQP and see that we get back what we expected. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(returned); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 047d88d2ce..440de12534 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -35,10 +35,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -50,7 +50,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); assertNull(received); @@ -66,10 +66,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -81,7 +81,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); @@ -97,10 +97,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -114,7 +114,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); assertNull(received); @@ -130,10 +130,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -149,7 +149,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { Thread.sleep(1000); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); assertNull(received); @@ -165,10 +165,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -184,7 +184,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); @@ -200,10 +200,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -215,7 +215,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); @@ -231,10 +231,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertNotNull(queueView); AmqpMessage message = new AmqpMessage(); @@ -248,7 +248,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { assertEquals(1, queueView.getMessageCount()); // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); assertNull(received); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java similarity index 92% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java index 16cd7c3663..81c28855ef 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java @@ -16,6 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; @@ -25,34 +36,22 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - @RunWith(Parameterized.class) -public class AmqpNettyFailoverTest extends FailoverTestBase { - +public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase { // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" @Parameterized.Parameters(name = "{0}") - public static Collection getParameters() { + public static Collection getParameters() { // these 3 are for comparison return Arrays.asList(new Object[][]{{"NON_SSL", 0} /*, {"SSL", 1} */ }); } - private final int protocol; - public AmqpNettyFailoverTest(String name, int protocol) { + public AmqpFailoverEndpointDiscoveryTest(String name, int protocol) { this.protocol = protocol; } @@ -66,7 +65,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase { return getNettyConnectorTransportConfig(live); } - @Test(timeout = 120000) public void testFailoverListWithAMQP() throws Exception { JmsConnectionFactory factory = getJmsConnectionFactory(); @@ -94,7 +92,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase { } else { String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile(); String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile(); - // return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false"); return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)"); } } @@ -108,14 +105,12 @@ public class AmqpNettyFailoverTest extends FailoverTestBase { server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks"); server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample"); - //server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); - } + if (live) { return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); } - server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); @@ -137,5 +132,4 @@ public class AmqpNettyFailoverTest extends FailoverTestBase { server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java new file mode 100644 index 0000000000..79bdf590b9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpInboundConnectionTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.ANONYMOUS_RELAY; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_OPEN_FAILED; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONTAINER_ID; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.INVALID_FIELD; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.VersionLoader; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for behaviors expected of the broker when clients connect to the broker + */ +public class AmqpInboundConnectionTest extends AmqpClientTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpInboundConnectionTest.class); + + private static final String BROKER_NAME = "localhost"; + private static final String PRODUCT_NAME = "apache-activemq-artemis"; + + @Test + public void testBrokerContainerId() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!BROKER_NAME.equals(connection.getRemoteContainer())) { + markAsInvalid("Broker did not send the expected container ID"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } finally { + connection.close(); + } + } + + @Test + public void testBrokerConnectionProperties() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + + Map properties = connection.getRemoteProperties(); + if (!properties.containsKey(PRODUCT)) { + markAsInvalid("Broker did not send a queue product name value"); + return; + } + + if (!properties.containsKey(VERSION)) { + markAsInvalid("Broker did not send a queue version value"); + return; + } + + if (!PRODUCT_NAME.equals(properties.get(PRODUCT))) { + markAsInvalid("Broker did not send a the expected product name"); + return; + } + + String brokerVersion = VersionLoader.getVersion().getFullVersion(); + if (!brokerVersion.equals(properties.get(VERSION))) { + markAsInvalid("Broker did not send a the expected product version"); + return; + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testConnectionCarriesExpectedCapabilities() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + + Symbol[] offered = connection.getRemoteOfferedCapabilities(); + + if (!contains(offered, ANONYMOUS_RELAY)) { + markAsInvalid("Broker did not indicate it support anonymous relay"); + return; + } + + if (!contains(offered, DELAYED_DELIVERY)) { + markAsInvalid("Broker did not indicate it support delayed message delivery"); + return; + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testCanConnectWithDifferentContainerIds() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection1 = addConnection(client.createConnection()); + AmqpConnection connection2 = addConnection(client.createConnection()); + + connection1.setContainerId(getTestName() + "-Client:1"); + connection2.setContainerId(getTestName() + "-Client:2"); + + connection1.connect(); + assertEquals(1, server.getConnectionCount()); + + connection2.connect(); + assertEquals(2, server.getConnectionCount()); + + connection1.close(); + assertEquals(1, server.getConnectionCount()); + + connection2.close(); + assertEquals(0, server.getConnectionCount()); + } + + @Test(timeout = 60000) + public void testCannotConnectWithSameContainerId() throws Exception { + AmqpClient client = createAmqpClient(); + + List desiredCapabilities = new ArrayList<>(1); + desiredCapabilities.add(AmqpSupport.SOLE_CONNECTION_CAPABILITY); + + assertNotNull(client); + + AmqpConnection connection1 = addConnection(client.createConnection()); + AmqpConnection connection2 = addConnection(client.createConnection()); + + connection1.setDesiredCapabilities(desiredCapabilities); + connection2.setDesiredCapabilities(desiredCapabilities); + + connection1.setContainerId(getTestName()); + connection2.setContainerId(getTestName()); + + connection1.connect(); + assertEquals(1, server.getConnectionCount()); + + connection2.setStateInspector(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + markAsInvalid("Broker did not set connection establishment failed property"); + } + } + + @Override + public void inspectClosedResource(Connection connection) { + ErrorCondition remoteError = connection.getRemoteCondition(); + if (remoteError == null || remoteError.getCondition() == null) { + markAsInvalid("Broker did not add error condition for duplicate client ID"); + } else { + if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) { + markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD); + } + + if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) { + markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD); + } + } + + // Validate the info map contains a hint that the container/client id was the + // problem + Map infoMap = remoteError.getInfo(); + if (infoMap == null) { + markAsInvalid("Broker did not set an info map on condition"); + } else if (!infoMap.containsKey(INVALID_FIELD)) { + markAsInvalid("Info map does not contain expected key"); + } else { + Object value = infoMap.get(INVALID_FIELD); + if (!CONTAINER_ID.equals(value)) { + markAsInvalid("Info map does not contain expected value: " + value); + } + } + } + }); + + try { + connection2.connect(); + fail("Should not be able to connect with same container Id."); + } catch (Exception ex) { + LOG.info("Second connection with same container Id failed as expected."); + } + + connection2.getStateInspector().assertValid(); + connection2.close(); + + assertTrue(Wait.waitFor(() -> server.getConnectionCount() == 1)); + + connection1.close(); + assertEquals(0, server.getConnectionCount()); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java new file mode 100644 index 0000000000..c84a590595 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpManagementTest extends AmqpClientTestSupport { + + @Test + public void testManagementQueryOverAMQP() throws Throwable { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + String destinationAddress = getQueueName(1); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("activemq.management"); + AmqpReceiver receiver = session.createReceiver(destinationAddress); + receiver.flow(10); + + // Create request message for getQueueNames query + AmqpMessage request = new AmqpMessage(); + request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER); + request.setApplicationProperty("_AMQ_OperationName", "getQueueNames"); + request.setReplyToAddress(destinationAddress); + request.setText("[]"); + + sender.send(request); + AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + assertNotNull(response); + Object section = response.getWrappedMessage().getBody(); + assertTrue(section instanceof AmqpValue); + Object value = ((AmqpValue) section).getValue(); + assertTrue(value instanceof String); + assertTrue(((String) value).length() > 0); + assertTrue(((String) value).contains(destinationAddress)); + response.accept(); + } finally { + connection.close(); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java index 2b573546cf..097d9bf5bb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -42,7 +42,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:1"); @@ -51,9 +51,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -73,7 +73,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setDurable(true); @@ -91,9 +91,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -114,7 +114,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:1"); @@ -123,9 +123,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -146,7 +146,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:1"); @@ -155,9 +155,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -178,7 +178,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:1"); @@ -186,9 +186,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -208,7 +208,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (short i = 0; i <= 9; ++i) { AmqpMessage message = new AmqpMessage(); @@ -219,9 +219,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(10, queueView.getMessageCount()); receiver.flow(10); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java index 657aff702d..422e23e236 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java @@ -35,16 +35,16 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception { final int MSG_COUNT = 2; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName(), null, false, true); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, true); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(1); @@ -68,7 +68,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); // Should be nothing left on the Queue - AmqpReceiver receiver3 = session.createReceiver(getTestName()); + AmqpReceiver receiver3 = session.createReceiver(getQueueName()); receiver3.flow(1); AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS); @@ -85,15 +85,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testPresettledReceiverReadsAllMessages() throws Exception { final int MSG_COUNT = 100; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver.flow(MSG_COUNT); @@ -105,7 +105,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); // Open a new receiver and see if any message are left on the Queue - receiver = session.createReceiver(getTestName()); + receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); if (received != null) { @@ -121,15 +121,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception { final int MSG_COUNT = 100; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); // Consume all 100 but do so in batches by flowing only limited credit. @@ -157,7 +157,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); // Open a new receiver and see if any message are left on the Queue - receiver = session.createReceiver(getTestName()); + receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); if (received != null) { @@ -185,8 +185,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -194,7 +194,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); session.begin(); @@ -221,8 +221,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -230,7 +230,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); + AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); session.begin(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index b347d3755f..9cd8f50f96 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -34,19 +34,18 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { @Test(timeout = 30000) public void testReleasedDisposition() throws Exception { - sendMessages(getTestName(), 1); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); - + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertNotNull("did not receive message first time", message); assertEquals("MessageID:0", message.getMessageId()); @@ -75,13 +74,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { @Test(timeout = 30000) public void testRejectedDisposition() throws Exception { - sendMessages(getTestName(), 1); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); @@ -101,7 +100,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { assertNull("Should not receive message again", message); // Attempt to Read the message again with another receiver to validate it is archived. - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(1); assertNull(receiver2.receiveNoWait()); @@ -129,13 +128,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { } private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { - sendMessages(getTestName(), 1); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); @@ -154,7 +153,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { assertNull("Should not receive message again", message); } - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(1); message = receiver2.receive(5, TimeUnit.SECONDS); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java index 1af9028a42..681ffbdf4b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java @@ -35,15 +35,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testReceiverCanDrainMessages() throws Exception { int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = client.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver.drain(MSG_COUNT); @@ -66,11 +66,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { AmqpConnection connection = client.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(10); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(0, queueView.getMessageCount()); assertEquals(0, queueView.getDeliveringCount()); @@ -86,15 +86,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testPullOneFromRemote() throws Exception { int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = client.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(0, receiver.getReceiver().getRemoteCredit()); @@ -119,11 +119,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { AmqpConnection connection = client.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(10); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(0, queueView.getMessageCount()); assertEquals(10, receiver.getReceiver().getRemoteCredit()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java index f8f726a570..1e140c70c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java @@ -74,7 +74,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER); Source source = new Source(); - source.setAddress(getTestName()); + source.setAddress(getQueueName()); source.setFilter(filters); source.setDurable(TerminusDurability.NONE); source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); @@ -116,13 +116,12 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - session.createReceiver(getTestName(), "color = red"); + session.createReceiver(getQueueName(), "color = red"); connection.getStateInspector().assertValid(); connection.close(); } - @Test(timeout = 60000) public void testReceivedUnsignedFilter() throws Exception { final int NUM_MESSAGES = 100; @@ -131,10 +130,9 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { AmqpConnection connection = client.connect(); try { - // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES + 1; ++i) { AmqpMessage message = new AmqpMessage(); @@ -144,7 +142,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName(), "myNewID < " + (NUM_MESSAGES / 2)); + AmqpReceiver receiver = session.createReceiver(getQueueName(), "myNewID < " + (NUM_MESSAGES / 2)); ArrayList messages = new ArrayList<>(NUM_MESSAGES); receiver.flow((NUM_MESSAGES + 2) * 2); for (int i = 0; i < NUM_MESSAGES / 2; ++i) { @@ -161,6 +159,4 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { connection.close(); } } - - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java index 689c23c519..748f10afa1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java @@ -36,101 +36,170 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { public void testSendWithDeliveryTimeIsScheduled() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); - AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); - assertNull(received); + assertEquals(1, queueView.getScheduledCount()); - connection.close(); + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + } finally { + connection.close(); + } } @Test(timeout = 60000) public void testSendRecvWithDeliveryTime() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + 6000; - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 6000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); + assertEquals(1, queueView.getScheduledCount()); - // Now try and get the message, should not due to being scheduled. - AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); - assertNull(received); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); - // Now try and get the message, should get it now - received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); - connection.close(); + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } finally { + connection.close(); + } } @Test public void testScheduleWithDelay() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long delay = 6000; - message.setMessageAnnotation("x-opt-delivery-delay", delay); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long delay = 6000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); + assertEquals(1, queueView.getScheduledCount()); - // Now try and get the message, should not due to being scheduled. - AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); - assertNull(received); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); - // Now try and get the message, should get it now - received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); - connection.close(); + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeHoldsMessage() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + // Shouldn't get this since we delayed the message. + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 2000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); + assertNotNull(msgDeliveryTime); + assertEquals(deliveryTime, msgDeliveryTime.longValue()); + } finally { + connection.close(); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java index e2b80f5716..8e41d71cc7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,11 +22,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -57,7 +57,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { HierarchicalRepository> securityRepository = server.getSecurityRepository(); HashSet value = new HashSet<>(); value.add(new Role("none", false, true, true, true, true, true, true, true)); - securityRepository.addMatch(getTestName(), value); + securityRepository.addMatch(getQueueName(), value); serverManager = new JMSServerManagerImpl(server); Configuration serverConfig = server.getConfiguration(); @@ -135,7 +135,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("msg" + 1); @@ -154,8 +154,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception { - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST)); - server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false); AmqpClient client = createAmqpClient(user1, password1); AmqpConnection connection = client.connect(); @@ -165,7 +165,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { AmqpSender sender = session.createAnonymousSender(); AmqpMessage message = new AmqpMessage(); - message.setAddress(getTestName()); + message.setAddress(getQueueName()); message.setMessageId("msg" + 1); message.setText("Test-Message"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 54b361ca53..9cf256a679 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -69,9 +69,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertNotNull(queue); receiver.close(); @@ -84,9 +84,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - sendMessages(getTestName(), 10); + sendMessages(getQueueName(), 10); for (int i = 0; i < 10; i++) { receiver.flow(1); @@ -98,7 +98,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.close(); connection.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertNotNull(queue); assertEquals(0, queue.getMessageCount()); } @@ -130,7 +130,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - session.createReceiver(getTestName(), "JMSPriority > 8"); + session.createReceiver(getQueueName(), "JMSPriority > 8"); connection.getStateInspector().assertValid(); connection.close(); @@ -163,7 +163,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - session.createReceiver(getTestName(), null, true); + session.createReceiver(getQueueName(), null, true); connection.getStateInspector().assertValid(); connection.close(); @@ -177,7 +177,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); try { - session.createReceiver(getTestName(), "null = 'f''", true); + session.createReceiver(getQueueName(), "null = 'f''", true); fail("should throw exception"); } catch (Exception e) { assertTrue(e.getCause() instanceof JMSException); @@ -189,15 +189,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testQueueReceiverReadMessage() throws Exception { - sendMessages(getTestName(), 1); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -211,11 +211,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testQueueReceiverReadMessageWithDivert() throws Exception { - final String forwardingAddress = getTestName() + "Divert"; + final String forwardingAddress = getQueueName() + "Divert"; final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); - server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); - sendMessages(getTestName(), 1); + server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -313,15 +313,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { - sendMessages(getTestName(), 1, false); + sendMessages(getQueueName(), 1, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -337,15 +337,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableTrue() throws Exception { - sendMessages(getTestName(), 1, true); + sendMessages(getQueueName(), 1, true); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -362,22 +362,22 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(2); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(2, server.getTotalConsumerCount()); @@ -398,15 +398,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { int MSG_COUNT = 4; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(2); @@ -425,7 +425,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(2, server.getTotalConsumerCount()); @@ -456,15 +456,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception { int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(20); @@ -479,7 +479,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver1.close(); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(1, server.getTotalConsumerCount()); @@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); @@ -525,7 +525,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); LOG.info("Attempting to read message with receiver"); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message", received); @@ -544,7 +544,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -560,17 +560,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queue.getMessageCount()); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(MSG_COUNT); AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS); assertNotNull("Should have got a message", received); assertEquals("msg0", received.getMessageId()); receiver1.close(); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(200); for (int i = 0; i < MSG_COUNT; ++i) { received = receiver2.receive(5, TimeUnit.SECONDS); @@ -597,12 +597,12 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message2.setGroupId("hijklm"); message2.setApplicationProperty("sn", 200); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); sender.send(message1); sender.send(message2); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100"); + AmqpReceiver receiver = session.createReceiver(getQueueName(), "sn = 100"); receiver.flow(2); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull("Should have read a message", received); @@ -624,7 +624,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -639,7 +639,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); LOG.info("Attempting to read first two messages with receiver #1"); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(2); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS); @@ -651,7 +651,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message2.accept(); LOG.info("Attempting to read next two messages with receiver #2"); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(2); AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS); AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS); @@ -685,7 +685,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -699,10 +699,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(1); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); @@ -759,7 +759,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); @@ -793,7 +793,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { final CountDownLatch receiverReady = new CountDownLatch(1); ExecutorService executorService = Executors.newCachedThreadPool(); - final String address = getTestName(); + final String address = getQueueName(); executorService.submit(new Runnable() { @Override @@ -858,10 +858,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); // Create default message that should be sent as non-durable AmqpMessage message1 = new AmqpMessage(); @@ -904,7 +904,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); @@ -957,7 +957,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpSender sender = session.createSender(address); AmqpReceiver receiver1 = session.createReceiver(address, null, false, true); @@ -1036,7 +1036,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); assertNotNull(sender); connection.getStateInspector().assertValid(); @@ -1047,7 +1047,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageWithToFieldSetToSenderAddress() throws Exception { - doTestMessageWithToFieldSet(false, getTestName()); + doTestMessageWithToFieldSet(false, getQueueName()); } @Test(timeout = 60000) @@ -1067,7 +1067,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageWithToFieldSetWithAnonymousSender() throws Exception { - doTestMessageWithToFieldSet(true, getTestName()); + doTestMessageWithToFieldSet(true, getQueueName()); } private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception { @@ -1075,7 +1075,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpSender sender = session.createSender(anonymous ? null : address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index 8f8945213a..a360eb817e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.tests.integration.amqp; import java.net.URI; @@ -25,8 +24,12 @@ import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.junit.After; -/** This will only add methods to support AMQP Testing without creating servers or anything */ +/** + * Base test support class providing client support methods to aid in + * creating and configuration the AMQP test client. + */ public class AmqpTestSupport extends ActiveMQTestBase { + protected LinkedList connections = new LinkedList<>(); protected boolean useSSL; @@ -121,7 +124,4 @@ public class AmqpTestSupport extends ActiveMQTestBase { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { return new AmqpClient(brokerURI, username, password); } - - - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 3a9d498f92..3b231fa09f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -94,7 +94,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); assertNotNull(session); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); sender.setStateInspector(new AmqpValidator() { @Override @@ -148,8 +148,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); session.begin(); @@ -173,8 +173,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); session.begin(); @@ -198,8 +198,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -207,7 +207,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -230,8 +230,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -239,7 +239,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -253,7 +253,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); session = connection.createSession(); - receiver = session.createReceiver(getTestName()); + receiver = session.createReceiver(getQueueName()); session.begin(); receiver.flow(1); @@ -274,8 +274,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -283,7 +283,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -308,7 +308,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Load up the Queue with some messages { AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); @@ -326,11 +326,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpReceiver receiver1 = session1.createReceiver(getTestName()); - AmqpReceiver receiver2 = session2.createReceiver(getTestName()); - AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + AmqpReceiver receiver1 = session1.createReceiver(getQueueName()); + AmqpReceiver receiver2 = session2.createReceiver(getQueueName()); + AmqpReceiver receiver3 = session3.createReceiver(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(3, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -365,7 +365,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Load up the Queue with some messages { AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); @@ -383,11 +383,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpReceiver receiver1 = session1.createReceiver(getTestName()); - AmqpReceiver receiver2 = session2.createReceiver(getTestName()); - AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + AmqpReceiver receiver1 = session1.createReceiver(getQueueName()); + AmqpReceiver receiver2 = session2.createReceiver(getQueueName()); + AmqpReceiver receiver3 = session3.createReceiver(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(3, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -428,11 +428,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpSender sender1 = session1.createSender(getTestName()); - AmqpSender sender2 = session2.createSender(getTestName()); - AmqpSender sender3 = session3.createSender(getTestName()); + AmqpSender sender1 = session1.createSender(getQueueName()); + AmqpSender sender2 = session2.createSender(getQueueName()); + AmqpSender sender3 = session3.createSender(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(0, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -468,11 +468,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpSender sender1 = session1.createSender(getTestName()); - AmqpSender sender2 = session2.createSender(getTestName()); - AmqpSender sender3 = session3.createSender(getTestName()); + AmqpSender sender1 = session1.createSender(getQueueName()); + AmqpSender sender2 = session2.createSender(getQueueName()); + AmqpSender sender3 = session3.createSender(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(0, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -509,7 +509,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Commit TXN work from a sender. txnSession.begin(); @@ -538,7 +538,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } txnSession.commit(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(NUM_MESSAGES * 2); for (int i = 0; i < NUM_MESSAGES * 2; ++i) { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); @@ -563,7 +563,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES + 1; ++i) { AmqpMessage message = new AmqpMessage(); @@ -573,7 +573,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); ArrayList messages = new ArrayList<>(NUM_MESSAGES); receiver.flow((NUM_MESSAGES + 2) * 2); for (int i = 0; i < NUM_MESSAGES; ++i) { @@ -629,7 +629,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES; ++i) { AmqpMessage message = new AmqpMessage(); @@ -639,7 +639,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); @@ -700,7 +700,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES + 1; ++i) { AmqpMessage message = new AmqpMessage(); @@ -710,7 +710,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); ArrayList messages = new ArrayList<>(NUM_MESSAGES); receiver.flow((NUM_MESSAGES + 2) * 2); for (int i = 0; i < NUM_MESSAGES; ++i) { @@ -787,7 +787,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES; ++i) { AmqpMessage message = new AmqpMessage(); @@ -797,7 +797,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); @@ -930,12 +930,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); assertNotNull(session); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.setStateInspector(new AmqpValidator() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 4ee94c2a9d..7933fecae1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -16,6 +16,22 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -41,26 +57,10 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.remoting.CloseListener; @@ -84,17 +84,14 @@ import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.UUIDGenerator; -import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.junit.After; @@ -104,18 +101,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; - @RunWith(Parameterized.class) public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - private static final String brokerName = "localhost"; private static final long maxSizeBytes = 1 * 1024 * 1024; @@ -370,132 +361,6 @@ public class ProtonTest extends ProtonTestBase { } } - @Test - public void testBrokerContainerId() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer())); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testBrokerConnectionProperties() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - Map properties = amqpConnection.getEndpoint().getRemoteProperties(); - assertTrue(properties != null); - if (properties != null) { - assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product")))); - assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version")))); - } - } finally { - amqpConnection.close(); - } - } - - @Test(timeout = 60000) - public void testConnectionCarriesExpectedCapabilities() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - client.setValidator(new AmqpValidator() { - - @Override - public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) { - - Symbol[] offered = connection.getRemoteOfferedCapabilities(); - - if (!contains(offered, DELAYED_DELIVERY)) { - markAsInvalid("Broker did not indicate it support delayed message delivery"); - return; - } - - Map properties = connection.getRemoteProperties(); - if (!properties.containsKey(PRODUCT)) { - markAsInvalid("Broker did not send a queue product name value"); - return; - } - - if (!properties.containsKey(VERSION)) { - markAsInvalid("Broker did not send a queue version value"); - return; - } - } - }); - - AmqpConnection connection = client.connect(); - try { - assertNotNull(connection); - connection.getStateInspector().assertValid(); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendWithDeliveryTimeHoldsMessage() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(address); - - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - - // Now try and get the message - receiver.flow(1); - - // Shouldn't get this since we delayed the message. - assertNull(receiver.receive(1, TimeUnit.SECONDS)); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(address); - - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + 2000; - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - - // Now try and get the message - receiver.flow(1); - - AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); - Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); - assertNotNull(msgDeliveryTime); - assertEquals(deliveryTime, msgDeliveryTime.longValue()); - } finally { - connection.close(); - } - } - @Test public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { @@ -983,41 +848,6 @@ public class ProtonTest extends ProtonTestBase { amqpConnection.close(); } - @Test - public void testManagementQueryOverAMQP() throws Throwable { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - String destinationAddress = address + 1; - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender("activemq.management"); - AmqpReceiver receiver = session.createReceiver(destinationAddress); - receiver.flow(10); - - //create request message for getQueueNames query - AmqpMessage request = new AmqpMessage(); - request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER); - request.setApplicationProperty("_AMQ_OperationName", "getQueueNames"); - request.setReplyToAddress(destinationAddress); - request.setText("[]"); - - sender.send(request); - AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS); - Assert.assertNotNull(response); - assertNotNull(response); - Object section = response.getWrappedMessage().getBody(); - assertTrue(section instanceof AmqpValue); - Object value = ((AmqpValue) section).getValue(); - assertTrue(value instanceof String); - assertTrue(((String) value).length() > 0); - assertTrue(((String) value).contains(destinationAddress)); - response.accept(); - } finally { - amqpConnection.close(); - } - } - @Test public void testReplyTo() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1792,93 +1622,6 @@ public class ProtonTest extends ProtonTestBase { } } - @Test(timeout = 60000) - public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setAddress(address); - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - sender.send(message); - sender.close(); - - AmqpReceiver receiver = session.createReceiver(address); - receiver.flow(1); - AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull("Should have read message", received); - assertEquals("msg1", received.getMessageId()); - received.accept(); - - receiver.close(); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - try { - sender.send(message); - fail("Should not be able to send, message should be rejected"); - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - sender.close(); - } - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setAddress(address + "-not-in-service"); - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - try { - sender.send(message); - fail("Should not be able to send, message should be rejected"); - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - sender.close(); - } - } finally { - connection.close(); - } - } - private javax.jms.Queue createQueue(String address) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try {