diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index bf2e575355..28573e0664 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -101,17 +101,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } Object action = ((AmqpValue) msg.getBody()).getValue(); - if (action instanceof Declare) { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); - connection.lock(); - try { - delivery.disposition(declared); - } finally { - connection.unlock(); - } + IOCallback ioAction = new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + delivery.settle(); + delivery.disposition(declared); + } finally { + connection.unlock(); + connection.flush(); + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }; + sessionSPI.afterIO(ioAction); } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; @@ -124,9 +136,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { public void done() { connection.lock(); try { + delivery.settle(); delivery.disposition(new Accepted()); } finally { connection.unlock(); + connection.flush(); } } @@ -148,6 +162,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { log.warn(amqpE.getMessage(), amqpE); connection.lock(); try { + delivery.settle(); delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); } finally { connection.unlock(); @@ -157,29 +172,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { log.warn(e.getMessage(), e); connection.lock(); try { + delivery.settle(); delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); } finally { connection.unlock(); } connection.flush(); - } finally { - sessionSPI.afterIO(new IOCallback() { - @Override - public void done() { - connection.lock(); - try { - delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); - } - - @Override - public void onError(int errorCode, String errorMessage) { - - } - }); } }