This closes #1495
This commit is contained in:
commit
0610a2349c
|
@ -541,94 +541,94 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
}
|
||||
|
||||
boolean settleImmediate = true;
|
||||
if (remoteState != null) {
|
||||
// If we are transactional then we need ack if the msg has been accepted
|
||||
if (remoteState instanceof TransactionalState) {
|
||||
if (remoteState instanceof Accepted) {
|
||||
// this can happen in the twice ack mode, that is the receiver accepts and settles separately
|
||||
// acking again would show an exception but would have no negative effect but best to handle anyway.
|
||||
if (delivery.isSettled()) {
|
||||
return;
|
||||
}
|
||||
// we have to individual ack as we can't guarantee we will get the delivery updates
|
||||
// (including acks) in order
|
||||
// from dealer, a perf hit but a must
|
||||
try {
|
||||
sessionSPI.ack(null, brokerConsumer, message);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.toString(), e);
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof TransactionalState) {
|
||||
// When the message arrives with a TransactionState disposition the ack should
|
||||
// enlist the message into the transaction associated with the given txn ID.
|
||||
TransactionalState txState = (TransactionalState) remoteState;
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
||||
|
||||
TransactionalState txState = (TransactionalState) remoteState;
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
||||
|
||||
if (txState.getOutcome() != null) {
|
||||
settleImmediate = false;
|
||||
Outcome outcome = txState.getOutcome();
|
||||
if (outcome instanceof Accepted) {
|
||||
if (!delivery.remotelySettled()) {
|
||||
TransactionalState txAccepted = new TransactionalState();
|
||||
txAccepted.setOutcome(Accepted.getInstance());
|
||||
txAccepted.setTxnId(txState.getTxnId());
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(txAccepted);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
// we have to individual ack as we can't guarantee we will get the delivery
|
||||
// updates (including acks) in order
|
||||
// from dealer, a perf hit but a must
|
||||
if (txState.getOutcome() != null) {
|
||||
settleImmediate = false;
|
||||
Outcome outcome = txState.getOutcome();
|
||||
if (outcome instanceof Accepted) {
|
||||
if (!delivery.remotelySettled()) {
|
||||
TransactionalState txAccepted = new TransactionalState();
|
||||
txAccepted.setOutcome(Accepted.getInstance());
|
||||
txAccepted.setTxnId(txState.getTxnId());
|
||||
connection.lock();
|
||||
try {
|
||||
sessionSPI.ack(tx, brokerConsumer, message);
|
||||
tx.addDelivery(delivery, this);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||
delivery.disposition(txAccepted);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
// we have to individual ack as we can't guarantee we will get the delivery
|
||||
// updates (including acks) in order
|
||||
// from dealer, a perf hit but a must
|
||||
try {
|
||||
sessionSPI.ack(tx, brokerConsumer, message);
|
||||
tx.addDelivery(delivery, this);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
}
|
||||
} else if (remoteState instanceof Accepted) {
|
||||
//this can happen in the twice ack mode, that is the receiver accepts and settles separately
|
||||
//acking again would show an exception but would have no negative effect but best to handle anyway.
|
||||
if (delivery.isSettled()) {
|
||||
return;
|
||||
}
|
||||
} else if (remoteState instanceof Released) {
|
||||
try {
|
||||
sessionSPI.cancel(brokerConsumer, message, false);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Rejected) {
|
||||
try {
|
||||
sessionSPI.reject(brokerConsumer, message);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Modified) {
|
||||
try {
|
||||
Modified modification = (Modified) remoteState;
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
|
||||
message.rejectConsumer(brokerConsumer.sequentialID());
|
||||
}
|
||||
// we have to individual ack as we can't guarantee we will get the delivery updates
|
||||
// (including acks) in order
|
||||
// from dealer, a perf hit but a must
|
||||
try {
|
||||
sessionSPI.ack(null, brokerConsumer, message);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.toString(), e);
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Released) {
|
||||
try {
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
|
||||
sessionSPI.cancel(brokerConsumer, message, true);
|
||||
} else {
|
||||
sessionSPI.cancel(brokerConsumer, message, false);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Rejected) {
|
||||
try {
|
||||
sessionSPI.reject(brokerConsumer, message);
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} else if (remoteState instanceof Modified) {
|
||||
try {
|
||||
Modified modification = (Modified) remoteState;
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
|
||||
message.rejectConsumer(brokerConsumer.sequentialID());
|
||||
}
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
|
||||
sessionSPI.cancel(brokerConsumer, message, true);
|
||||
} else {
|
||||
sessionSPI.cancel(brokerConsumer, message, false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||
}
|
||||
|
||||
if (!preSettle) {
|
||||
protonSession.replaceTag(delivery.getTag());
|
||||
}
|
||||
|
||||
if (settleImmediate)
|
||||
settle(delivery);
|
||||
|
||||
} else {
|
||||
// todo not sure if we need to do anything here
|
||||
log.debug("Received null or unknown disposition for delivery update: " + remoteState);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!preSettle) {
|
||||
protonSession.replaceTag(delivery.getTag());
|
||||
}
|
||||
|
||||
if (settleImmediate) {
|
||||
settle(delivery);
|
||||
}
|
||||
|
||||
} finally {
|
||||
sessionSPI.afterIO(new IOCallback() {
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue