diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 4cbf744f4c..e0e7276ffa 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -227,7 +227,7 @@ public class AmqpSender extends AmqpAbstractLink { settle(delivery, -1); } else if (state instanceof Modified) { Modified modified = (Modified) state; - if (modified.getDeliveryFailed()) { + if (Boolean.TRUE.equals(modified.getDeliveryFailed())) { // increment delivery counter.. md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 0acd1c676a..e8ad7937fd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -140,43 +140,21 @@ public class AmqpMessage { } /** - * Rejects the message, marking it as not deliverable here and failed to deliver. + * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. * - * @throws Exception if an error occurs during the reject. - */ - public void reject() throws Exception { - reject(true, true); - } - - /** - * Rejects the message, marking it as failed to deliver and applying the given value - * to the undeliverable here tag. - * - * @param undeliverableHere - * marks the delivery as not being able to be process by link it was sent to. - * - * @throws Exception if an error occurs during the reject. - */ - public void reject(boolean undeliverableHere) throws Exception { - reject(undeliverableHere, true); - } - - /** - * Rejects the message, marking it as not deliverable here and failed to deliver. - * - * @param undeliverableHere - * marks the delivery as not being able to be process by link it was sent to. * @param deliveryFailed * indicates that the delivery failed for some reason. + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. * - * @throws Exception if an error occurs during the reject. + * @throws Exception if an error occurs during the process. */ - public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception { + public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { if (receiver == null) { - throw new IllegalStateException("Can't reject non-received message."); + throw new IllegalStateException("Can't modify non-received message."); } - receiver.reject(delivery, undeliverableHere, deliveryFailed); + receiver.modified(delivery, deliveryFailed, undeliverableHere); } /** diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 049fe4d1bf..98241cd947 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -316,18 +316,17 @@ public class AmqpReceiver extends AmqpAbstractResource { } /** - * Reject a message that was dispatched under the given Delivery instance. + * Mark a message that was dispatched under the given Delivery instance as Modified. * * @param delivery - * the Delivery instance to reject. - * @param undeliverableHere - * marks the delivery as not being able to be process by link it was sent to. + * the Delivery instance to mark modified. * @param deliveryFailed * indicates that the delivery failed for some reason. - * + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. * @throws IOException if an error occurs while sending the reject. */ - public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException { + public void modified(final Delivery delivery, final Boolean deliveryFailed, final Boolean undeliverableHere) throws IOException { checkClosed(); if (delivery == null) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 30571f7290..1502bdaaf9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -43,6 +44,7 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; import org.junit.Test; /** @@ -412,4 +414,67 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.getStateInspector().assertValid(); connection.close(); } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, null); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, Boolean.TRUE); + } + + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE); + } + + private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { + int msgCount = 1; + sendMessages(getTestName(), msgCount, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(2 * msgCount); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.modified(deliveryFailed, undeliverableHere); + + if(Boolean.TRUE.equals(undeliverableHere)) { + message = receiver.receive(250, TimeUnit.MILLISECONDS); + assertNull("Should not receive message again", message); + } else { + message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + int expectedDeliveryCount = 0; + if(Boolean.TRUE.equals(deliveryFailed)) { + expectedDeliveryCount = 1; + } + + message.accept(); + + Message protonMessage2 = message.getWrappedMessage(); + assertNotNull(protonMessage2); + assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount()); + } + + receiver.close(); + connection.close(); + } }