diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 8d42fe90cd..938c6cacd2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -521,12 +521,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } else if (remoteState instanceof Rejected || remoteState instanceof Modified) { + } else if (remoteState instanceof Rejected) { try { sessionSPI.cancel(brokerConsumer, message, true); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } + } else if (remoteState instanceof Modified) { + try { + Modified modification = (Modified) remoteState; + if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { + sessionSPI.cancel(brokerConsumer, message, true); + } else { + sessionSPI.cancel(brokerConsumer, message, false); + } + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } } // todo add tag caching if (!preSettle) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 5cf2c0aa12..bd893773a1 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -191,7 +191,7 @@ public class AmqpMessage { /** * Release the message, remote can redeliver it elsewhere. * - * @throws Exception if an error occurs during the reject. + * @throws Exception if an error occurs during the release. */ public void release() throws Exception { if (receiver == null) { @@ -201,6 +201,19 @@ public class AmqpMessage { receiver.release(delivery); } + /** + * Reject the message, remote can redeliver it elsewhere. + * + * @throws Exception if an error occurs during the reject. + */ + public void reject() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't release non-received message."); + } + + receiver.reject(delivery); + } + //----- Convenience methods for constructing outbound messages -----------// /** diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index b6d2ba1915..cd76501e3b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -578,6 +578,41 @@ public class AmqpReceiver extends AmqpAbstractResource { request.sync(); } + /** + * Reject a message that was dispatched under the given Delivery instance. + * + * @param delivery the Delivery instance to reject. + * @throws IOException if an error occurs while sending the release. + */ + public void reject(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to release cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(new Rejected()); + delivery.settle(); + session.pumpToProtonTransport(request); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + /** * @return an unmodifiable view of the underlying Receiver instance. */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java new file mode 100644 index 0000000000..d92fa0feec --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.message.Message; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { + + @Test(timeout = 30000) + public void testReleasedDisposition() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.release(); + + // Read the message again and validate its state + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + message = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + assertEquals("MessageID:0", message.getMessageId()); + + message.accept(); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + connection.close(); + } + + @Test(timeout = 30000) + public void testRejectedDisposition() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.reject(); + + // Reject is a terminal outcome and should not be redelivered to the rejecting receiver + // or any other as it should move to the archived state. + receiver1.flow(1); + message = receiver1.receive(1, TimeUnit.SECONDS); + assertNull("Should not receive message again", message); + + // Attempt to Read the message again with another receiver to validate it is archived. + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + assertNull(receiver2.receive(3, TimeUnit.SECONDS)); + + connection.close(); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, Boolean.TRUE); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE); + } + + private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getTestName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.modified(deliveryFailed, undeliverableHere); + + // Remote must not redispatch to the client if undeliverable here is true + if (Boolean.TRUE.equals(undeliverableHere)) { + receiver1.flow(1); + message = receiver1.receive(1, TimeUnit.SECONDS); + assertNull("Should not receive message again", message); + } + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + receiver2.flow(1); + + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + int expectedDeliveryCount = 0; + if (Boolean.TRUE.equals(deliveryFailed)) { + expectedDeliveryCount = 1; + } + + message.accept(); + + Message protonMessage2 = message.getWrappedMessage(); + assertNotNull(protonMessage2); + assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount()); + + connection.close(); + } + + public void sendMessages(String destinationName, int count) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + sender.send(message); + } + } finally { + connection.close(); + } + } +}