From 34a912b644d97c4c6c420095a0df587007355512 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 5 Dec 2017 12:12:54 +0000 Subject: [PATCH] ARTEMIS-1535 - settle delivery in same lock as sending the disposition https://issues.apache.org/jira/browse/ARTEMIS-1535 --- .../transaction/ProtonTransactionHandler.java | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index bf2e575355..28573e0664 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -101,17 +101,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } Object action = ((AmqpValue) msg.getBody()).getValue(); - if (action instanceof Declare) { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); - connection.lock(); - try { - delivery.disposition(declared); - } finally { - connection.unlock(); - } + IOCallback ioAction = new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + delivery.settle(); + delivery.disposition(declared); + } finally { + connection.unlock(); + connection.flush(); + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }; + sessionSPI.afterIO(ioAction); } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; @@ -124,9 +136,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { public void done() { connection.lock(); try { + delivery.settle(); delivery.disposition(new Accepted()); } finally { connection.unlock(); + connection.flush(); } } @@ -148,6 +162,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { log.warn(amqpE.getMessage(), amqpE); connection.lock(); try { + delivery.settle(); delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); } finally { connection.unlock(); @@ -157,29 +172,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { log.warn(e.getMessage(), e); connection.lock(); try { + delivery.settle(); delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); } finally { connection.unlock(); } connection.flush(); - } finally { - sessionSPI.afterIO(new IOCallback() { - @Override - public void done() { - connection.lock(); - try { - delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); - } - - @Override - public void onError(int errorCode, String errorMessage) { - - } - }); } }