ARTEMIS-1080 Implementing AMQP::reject

This commit is contained in:
Clebert Suconic 2017-03-28 17:12:02 -04:00 committed by Justin Bertram
parent 13a272b37b
commit 746220e11e
6 changed files with 35 additions and 3 deletions

View File

@ -336,6 +336,15 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
public void reject(Object brokerConsumer, Message message) throws Exception {
recoverContext();
try {
((ServerConsumer) brokerConsumer).reject(message.getMessageID());
} finally {
resetContext();
}
}
public void resumeDelivery(Object consumer) {
((ServerConsumer) consumer).receiveCredits(-1);
}

View File

@ -546,7 +546,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else if (remoteState instanceof Rejected) {
try {
sessionSPI.cancel(brokerConsumer, message, true);
sessionSPI.reject(brokerConsumer, message);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}

View File

@ -92,6 +92,8 @@ public interface ServerConsumer extends Consumer {
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
void reject(final long messageID) throws Exception;
void individualCancel(final long messageID, boolean failed) throws Exception;
void forceDelivery(long sequence);

View File

@ -910,6 +910,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.getQueue().cancel(ref, System.currentTimeMillis());
}
@Override
public synchronized void reject(final long messageID) throws Exception {
if (browseOnly) {
return;
}
MessageReference ref = removeReferenceByID(messageID);
if (ref == null) {
return; // nothing to be done
}
ref.getQueue().sendToDeadLetterAddress(null, ref);
}
@Override
public synchronized void backToDelivering(MessageReference reference) {
deliveringRefs.addFirst(reference);

View File

@ -97,13 +97,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
// Reject is a terminal outcome and should not be redelivered to the rejecting receiver
// or any other as it should move to the archived state.
receiver1.flow(1);
message = receiver1.receive(1, TimeUnit.SECONDS);
message = receiver1.receiveNoWait();
assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getTestName());
receiver2.flow(1);
assertNull(receiver2.receive(3, TimeUnit.SECONDS));
assertNull(receiver2.receiveNoWait());
connection.close();
}

View File

@ -122,6 +122,11 @@ public class DummyServerConsumer implements ServerConsumer {
return null;
}
@Override
public void reject(long messageID) throws Exception {
}
@Override
public void acknowledge(Transaction tx, long messageID) throws Exception {