diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index c774b4d1f9..50d2ef4773 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -541,94 +541,94 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } boolean settleImmediate = true; - if (remoteState != null) { - // If we are transactional then we need ack if the msg has been accepted - if (remoteState instanceof TransactionalState) { + if (remoteState instanceof Accepted) { + // this can happen in the twice ack mode, that is the receiver accepts and settles separately + // acking again would show an exception but would have no negative effect but best to handle anyway. + if (delivery.isSettled()) { + return; + } + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(null, brokerConsumer, message); + } catch (Exception e) { + log.warn(e.toString(), e); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + } else if (remoteState instanceof TransactionalState) { + // When the message arrives with a TransactionState disposition the ack should + // enlist the message into the transaction associated with the given txn ID. + TransactionalState txState = (TransactionalState) remoteState; + ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); - TransactionalState txState = (TransactionalState) remoteState; - ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); - - if (txState.getOutcome() != null) { - settleImmediate = false; - Outcome outcome = txState.getOutcome(); - if (outcome instanceof Accepted) { - if (!delivery.remotelySettled()) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(txState.getTxnId()); - connection.lock(); - try { - delivery.disposition(txAccepted); - } finally { - connection.unlock(); - } - } - // we have to individual ack as we can't guarantee we will get the delivery - // updates (including acks) in order - // from dealer, a perf hit but a must + if (txState.getOutcome() != null) { + settleImmediate = false; + Outcome outcome = txState.getOutcome(); + if (outcome instanceof Accepted) { + if (!delivery.remotelySettled()) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(txState.getTxnId()); + connection.lock(); try { - sessionSPI.ack(tx, brokerConsumer, message); - tx.addDelivery(delivery, this); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + delivery.disposition(txAccepted); + } finally { + connection.unlock(); } } + // we have to individual ack as we can't guarantee we will get the delivery + // updates (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(tx, brokerConsumer, message); + tx.addDelivery(delivery, this); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } } - } else if (remoteState instanceof Accepted) { - //this can happen in the twice ack mode, that is the receiver accepts and settles separately - //acking again would show an exception but would have no negative effect but best to handle anyway. - if (delivery.isSettled()) { - return; + } + } else if (remoteState instanceof Released) { + try { + sessionSPI.cancel(brokerConsumer, message, false); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } + } else if (remoteState instanceof Rejected) { + try { + sessionSPI.reject(brokerConsumer, message); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } + } else if (remoteState instanceof Modified) { + try { + Modified modification = (Modified) remoteState; + + if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { + message.rejectConsumer(brokerConsumer.sequentialID()); } - // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order - // from dealer, a perf hit but a must - try { - sessionSPI.ack(null, brokerConsumer, message); - } catch (Exception e) { - log.warn(e.toString(), e); - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Released) { - try { + + if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { + sessionSPI.cancel(brokerConsumer, message, true); + } else { sessionSPI.cancel(brokerConsumer, message, false); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Rejected) { - try { - sessionSPI.reject(brokerConsumer, message); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Modified) { - try { - Modified modification = (Modified) remoteState; - - if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { - message.rejectConsumer(brokerConsumer.sequentialID()); - } - - if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { - sessionSPI.cancel(brokerConsumer, message, true); - } else { - sessionSPI.cancel(brokerConsumer, message, false); - } - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - - if (!preSettle) { - protonSession.replaceTag(delivery.getTag()); - } - - if (settleImmediate) - settle(delivery); - } else { - // todo not sure if we need to do anything here + log.debug("Received null or unknown disposition for delivery update: " + remoteState); + return; } + + if (!preSettle) { + protonSession.replaceTag(delivery.getTag()); + } + + if (settleImmediate) { + settle(delivery); + } + } finally { sessionSPI.afterIO(new IOCallback() { @Override