From 7a8b7e9cfba8d4a8adfe96ee99d8a9df1273ac18 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 11 Oct 2016 12:16:28 -0400 Subject: [PATCH] ARTEMIS-792 Add additional tests for AMQP protocol Adds several tests for AMQP expectations in various use cases. --- .../amqp/AmqpClientTestSupport.java | 67 ++--- .../amqp/AmqpDeliveryAnnotationsTest.java | 64 ++++ .../amqp/AmqpDescribedTypePayloadTest.java | 151 ++++++++++ .../amqp/AmqpReceiverDrainTest.java | 165 +++++++++++ .../amqp/AmqpScheduledMessageTest.java | 136 +++++++++ .../integration/amqp/AmqpSendReceiveTest.java | 279 +++++++++++++++++- 6 files changed, 823 insertions(+), 39 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 2c7ce6fef5..14f9b610ff 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -21,8 +21,12 @@ import java.net.URI; import java.util.LinkedList; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -35,9 +39,11 @@ import org.junit.Before; */ public class AmqpClientTestSupport extends ActiveMQTestBase { - ActiveMQServer server; + private boolean useSSL; - LinkedList connections = new LinkedList<>(); + protected JMSServerManager serverManager; + protected ActiveMQServer server; + protected LinkedList connections = new LinkedList<>(); protected AmqpConnection addConnection(AmqpConnection connection) { connections.add(connection); @@ -48,9 +54,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { @Override public void setUp() throws Exception { super.setUp(); - - server = createServer(true, true); - server.start(); + server = createServer(); } @After @@ -63,18 +67,36 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { ignored.printStackTrace(); } } + + if (serverManager != null) { + try { + serverManager.stop(); + } catch (Throwable ignored) { + ignored.printStackTrace(); + } + serverManager = null; + } + server.stop(); super.tearDown(); } + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = createServer(true, true); + serverManager = new JMSServerManagerImpl(server); + Configuration serverConfig = server.getConfiguration(); + serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ"))); + serverConfig.setSecurityEnabled(false); + serverManager.start(); + server.start(); + return server; + } + public Queue getProxyToQueue(String queueName) { return server.locateQueue(SimpleString.toSimpleString(queueName)); } - private String connectorScheme = "amqp"; - private boolean useSSL; - public String getTestName() { return "jms.queue." + getName(); } @@ -83,14 +105,9 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { } public AmqpClientTestSupport(String connectorScheme, boolean useSSL) { - this.connectorScheme = connectorScheme; this.useSSL = useSSL; } - public String getConnectorScheme() { - return connectorScheme; - } - public boolean isUseSSL() { return useSSL; } @@ -99,30 +116,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { return ""; } - protected boolean isUseTcpConnector() { - return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws"); - } - - protected boolean isUseSslConnector() { - return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss"); - } - - protected boolean isUseNioConnector() { - return !isUseSSL() && connectorScheme.contains("nio"); - } - - protected boolean isUseNioPlusSslConnector() { - return isUseSSL() && connectorScheme.contains("nio"); - } - - protected boolean isUseWsConnector() { - return !isUseSSL() && connectorScheme.contains("ws"); - } - - protected boolean isUseWssConnector() { - return isUseSSL() && connectorScheme.contains("wss"); - } - public URI getBrokerAmqpConnectionURI() { boolean webSocket = false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java new file mode 100644 index 0000000000..93ff22b341 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test around the handling of Deliver Annotations in messages sent and received. + */ +public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport { + + private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION"; + + @Test(timeout = 60000) + public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpReceiver receiver = session.createReceiver(getTestName()); + + AmqpMessage message = new AmqpMessage(); + + message.setText("Test-Message"); + message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName()); + + sender.send(message); + receiver.flow(1); + + Queue queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getMessageCount()); + + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME)); + + sender.close(); + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java new file mode 100644 index 0000000000..bbb9c266a5 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test that the broker can pass through an AMQP message with a described type in the message + * body regardless of transformer in use. + */ +public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSendMessageWithDescribedTypeInBody() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setDescribedType(new AmqpNoLocalFilter()); + sender.send(message); + sender.close(); + + Queue queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getMessageCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertNotNull(received.getDescribedType()); + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setDescribedType(new AmqpNoLocalFilter()); + sender.send(message); + sender.close(); + connection.close(); + + Queue queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getMessageCount()); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection jmsConnection = factory.createConnection(); + try { + Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = jmsSession.createQueue(getName()); + MessageConsumer jmsConsumer = jmsSession.createConsumer(destination); + jmsConnection.start(); + + Message received = jmsConsumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof BytesMessage); + } finally { + jmsConnection.close(); + } + } + + @Test(timeout = 60000) + public void testDescribedTypeMessageRoundTrips() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + // Send with AMQP client. + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setDescribedType(new AmqpNoLocalFilter()); + sender.send(message); + sender.close(); + + Queue queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getMessageCount()); + + // Receive and resend with OpenWire JMS client + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection jmsConnection = factory.createConnection(); + try { + Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = jmsSession.createQueue(getName()); + MessageConsumer jmsConsumer = jmsSession.createConsumer(destination); + jmsConnection.start(); + + Message received = jmsConsumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof BytesMessage); + + MessageProducer jmsProducer = jmsSession.createProducer(destination); + jmsProducer.send(received); + } finally { + jmsConnection.close(); + } + + assertEquals(1, queue.getMessageCount()); + + // Now lets receive it with AMQP and see that we get back what we expected. + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(returned); + assertNotNull(returned.getDescribedType()); + receiver.close(); + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java new file mode 100644 index 0000000000..1af9028a42 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Tests various behaviors of broker side drain support. + */ +public class AmqpReceiverDrainTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testReceiverCanDrainMessages() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver.drain(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + receiver.close(); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPullWithNoMessageGetDrained() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + receiver.flow(10); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getMessageCount()); + assertEquals(0, queueView.getDeliveringCount()); + + assertEquals(10, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPullOneFromRemote() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + receiver.close(); + + assertEquals(MSG_COUNT - 1, queueView.getMessageCount()); + assertEquals(1, queueView.getMessagesAcknowledged()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMultipleZeroResultPulls() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + receiver.flow(10); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getMessageCount()); + + assertEquals(10, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + connection.close(); + } + + public void sendMessages(String destinationName, int count) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = null; + + try { + connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message-" + i); + sender.send(message); + } + + sender.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java new file mode 100644 index 0000000000..689c23c519 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test for scheduled message support using AMQP message annotations. + */ +public class AmqpScheduledMessageTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeIsScheduled() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getScheduledCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendRecvWithDeliveryTime() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 6000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getScheduledCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + connection.close(); + } + + @Test + public void testScheduleWithDelay() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long delay = 6000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getScheduledCount()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); + + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 6597a6249c..6c50b86639 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -16,22 +16,31 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; + import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +52,255 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); + @Test(timeout = 60000) + public void testCreateQueueReceiver() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queue = getProxyToQueue(getTestName()); + assertNotNull(queue); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateQueueReceiverWithJMSSelector() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { + markAsInvalid("Broker did not return the JMS Filter on Attach"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(getTestName(), "JMSPriority > 8"); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateQueueReceiverWithNoLocalSet() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map filters = source.getFilter(); + + // Currently don't support noLocal on a Queue + if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) { + markAsInvalid("Broker did not return the NoLocal Filter on Attach"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(getTestName(), null, true); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testQueueReceiverReadMessage() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { + int MSG_COUNT = 4; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver1.flow(2); + assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); + assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + assertEquals(2, server.getTotalConsumerCount()); + + receiver2.flow(2); + assertNotNull(receiver2.receive(5, TimeUnit.SECONDS)); + assertNotNull(receiver2.receive(5, TimeUnit.SECONDS)); + + assertEquals(0, queueView.getMessagesAcknowledged()); + + receiver1.close(); + receiver2.close(); + + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { + int MSG_COUNT = 4; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + + final Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver1.flow(2); + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return queueView.getMessagesAcknowledged() == 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + assertEquals(2, server.getTotalConsumerCount()); + + receiver2.flow(2); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return queueView.getMessagesAcknowledged() == 4; + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10))); + + receiver1.close(); + receiver2.close(); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + + final Queue queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + receiver1.flow(20); + + assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return queueView.getDeliveringCount() >= 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + receiver1.close(); + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + assertEquals(1, server.getTotalConsumerCount()); + + receiver2.flow(MSG_COUNT * 2); + AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return queueView.getMessagesAcknowledged() == 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + receiver2.close(); + + assertEquals(MSG_COUNT - 2, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testSimpleSendOneReceiveOne() throws Exception { @@ -476,7 +734,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() { @Override - public boolean isSatisified() throws Exception { + public boolean isSatisfied() throws Exception { return destinationView.getDeliveringCount() == 0; } })); @@ -554,4 +812,21 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + + public void sendMessages(String destinationName, int count) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + sender.send(message); + } + } finally { + connection.close(); + } + } }