Correct handling of rejected outcome to archive the message in the DLQ
(cherry picked from commit a9f9d4a4d2)
This commit is contained in:
Timothy Bish 2016-10-14 09:35:17 -04:00
parent 70728e97da
commit d126afc8fe
2 changed files with 8 additions and 14 deletions

View File

@ -270,10 +270,11 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
} }
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if (state instanceof Rejected) { } else if (state instanceof Rejected) {
// re-deliver /w incremented delivery counter. // Rejection is a terminal outcome, we poison the message for dispatch to
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); // the DLQ. If a custom redelivery policy is used on the broker the message
LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter()); // can still be redelivered based on the configation of that policy.
settle(delivery, -1); LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state, md.getRedeliveryCounter());
settle(delivery, MessageAck.POSION_ACK_TYPE);
} else if (state instanceof Released) { } else if (state instanceof Released) {
LOG.trace("onDelivery: Released state = {}", state); LOG.trace("onDelivery: Released state = {}", state);
// re-deliver && don't increment the counter. // re-deliver && don't increment the counter.

View File

@ -514,16 +514,9 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
message.reject(); message.reject();
// Read the message again and validate its state // Attempt to read the message again but should not get it.
message = receiver.receive(2, TimeUnit.SECONDS);
message = receiver.receive(10, TimeUnit.SECONDS); assertNull("shoudl not receive message again", message);
assertNotNull("did not receive message again", message);
message.accept();
protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount());
connection.close(); connection.close();
} }