ARTEMIS-1377 Refactor the disposition handling code

Avoid null checking each disposition before then checking the type, also
account for not knowing the type.  Rearrange the handling code to
prioritize the most common case which is "Accepted"
This commit is contained in:
Timothy Bish 2017-08-29 14:26:38 -04:00
parent 56cbed7294
commit 5d1b7e0bea
1 changed files with 77 additions and 77 deletions

View File

@ -541,10 +541,24 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
boolean settleImmediate = true;
if (remoteState != null) {
// If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) {
if (remoteState instanceof Accepted) {
// this can happen in the twice ack mode, that is the receiver accepts and settles separately
// acking again would show an exception but would have no negative effect but best to handle anyway.
if (delivery.isSettled()) {
return;
}
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order
// from dealer, a perf hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof TransactionalState) {
// When the message arrives with a TransactionState disposition the ack should
// enlist the message into the transaction associated with the given txn ID.
TransactionalState txState = (TransactionalState) remoteState;
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
@ -574,21 +588,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
}
} else if (remoteState instanceof Accepted) {
//this can happen in the twice ack mode, that is the receiver accepts and settles separately
//acking again would show an exception but would have no negative effect but best to handle anyway.
if (delivery.isSettled()) {
return;
}
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order
// from dealer, a perf hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Released) {
try {
sessionSPI.cancel(brokerConsumer, message, false);
@ -617,18 +616,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else {
log.debug("Received null or unknown disposition for delivery update: " + remoteState);
return;
}
if (!preSettle) {
protonSession.replaceTag(delivery.getTag());
}
if (settleImmediate)
if (settleImmediate) {
settle(delivery);
} else {
// todo not sure if we need to do anything here
}
} finally {
sessionSPI.afterIO(new IOCallback() {
@Override