From b9bf5d54f25f08bcd692efb2d3f81c4373f53807 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 11 Oct 2016 18:39:46 -0400 Subject: [PATCH] ARTEMIS-794 Add tests for handling of various outcomes for deliveries Adds tests for handling of Rejected, Released and Modified outcomes for a delivery sent to a receiver. Tests show that for the Modified outcome the broker is redelivering the message to the same receiver when the undeliverable here value is set which violates the AMQP 1.0 specified handling of that field. Also for Rejected outcome the broker should be sending the rejected message to the DLQ as Rejected is supposed to be a terminal outcome. Small fix included to not adjust the delivery count if the Modified outcome does not indicate that the delivery failed. --- .../proton/ProtonServerSenderContext.java | 13 +- .../transport/amqp/client/AmqpMessage.java | 15 +- .../transport/amqp/client/AmqpReceiver.java | 35 ++++ .../amqp/AmqpReceiverDispositionTest.java | 190 ++++++++++++++++++ 4 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 8d42fe90cd..938c6cacd2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -521,12 +521,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } else if (remoteState instanceof Rejected || remoteState instanceof Modified) { + } else if (remoteState instanceof Rejected) { try { sessionSPI.cancel(brokerConsumer, message, true); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } + } else if (remoteState instanceof Modified) { + try { + Modified modification = (Modified) remoteState; + if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { + sessionSPI.cancel(brokerConsumer, message, true); + } else { + sessionSPI.cancel(brokerConsumer, message, false); + } + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } } // todo add tag caching if (!preSettle) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 5cf2c0aa12..bd893773a1 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -191,7 +191,7 @@ public class AmqpMessage { /** * Release the message, remote can redeliver it elsewhere. * - * @throws Exception if an error occurs during the reject. + * @throws Exception if an error occurs during the release. */ public void release() throws Exception { if (receiver == null) { @@ -201,6 +201,19 @@ public class AmqpMessage { receiver.release(delivery); } + /** + * Reject the message, remote can redeliver it elsewhere. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't release non-received message."); + } + + receiver.reject(delivery); + } + //----- Convenience methods for constructing outbound messages -----------// /** diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index b6d2ba1915..cd76501e3b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -578,6 +578,41 @@ public class AmqpReceiver extends AmqpAbstractResource { request.sync(); } + /** + * Reject a message that was dispatched under the given Delivery instance. + * + * @param delivery the Delivery instance to reject. + * @throws IOException if an error occurs while sending the release. + */ + public void reject(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to release cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(new Rejected()); + delivery.settle(); + session.pumpToProtonTransport(request); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + /** * @return an unmodifiable view of the underlying Receiver instance. */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java new file mode 100644 index 0000000000..d92fa0feec --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.message.Message; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { + + @Test(timeout = 30000) + public void testReleasedDisposition() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.release(); + + // Read the message again and validate its state + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + message = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + assertEquals("MessageID:0", message.getMessageId()); + + message.accept(); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + connection.close(); + } + + @Test(timeout = 30000) + public void testRejectedDisposition() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.reject(); + + // Reject is a terminal outcome and should not be redelivered to the rejecting receiver + // or any other as it should move to the archived state. + receiver1.flow(1); + message = receiver1.receive(1, TimeUnit.SECONDS); + assertNull("Should not receive message again", message); + + // Attempt to Read the message again with another receiver to validate it is archived. + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + assertNull(receiver2.receive(3, TimeUnit.SECONDS)); + + connection.close(); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, Boolean.TRUE); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE); + } + + private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.modified(deliveryFailed, undeliverableHere); + + // Remote must not redispatch to the client if undeliverable here is true + if (Boolean.TRUE.equals(undeliverableHere)) { + receiver1.flow(1); + message = receiver1.receive(1, TimeUnit.SECONDS); + assertNull("Should not receive message again", message); + } + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + int expectedDeliveryCount = 0; + if (Boolean.TRUE.equals(deliveryFailed)) { + expectedDeliveryCount = 1; + } + + message.accept(); + + Message protonMessage2 = message.getWrappedMessage(); + assertNotNull(protonMessage2); + assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount()); + + connection.close(); + } + + public void sendMessages(String destinationName, int count) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + sender.send(message); + } + } finally { + connection.close(); + } + } +}