From abcde1c1f3674e4f8696b68ddfbb59498a2118d8 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 28 Jul 2015 09:06:40 +0100 Subject: [PATCH] added Openwire XA end call --- .../core/protocol/openwire/OpenWireConnection.java | 1 + .../protocol/openwire/OpenWireProtocolManager.java | 11 +++++++++++ .../core/protocol/openwire/amq/AMQSession.java | 13 +++++++++++++ 3 files changed, 25 insertions(+) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index d943af16af..6c6bff312b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1356,6 +1356,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processEndTransaction(TransactionInfo info) throws Exception { + protocolManager.endTransaction(info); TransactionId txId = info.getTransactionId(); if (!txMap.containsKey(txId)) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 15cb9e2be0..d2459856cc 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -711,6 +711,17 @@ public class OpenWireProtocolManager implements ProtocolManager, No server.destroyQueue(new SimpleString(q)); } + + public void endTransaction(TransactionInfo info) throws Exception + { + AMQSession txSession = transactions.get(info.getTransactionId()); + + if (txSession != null) + { + txSession.endTransaction(info); + } + } + public void commitTransactionOnePhase(TransactionInfo info) throws Exception { AMQSession txSession = transactions.get(info.getTransactionId()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 4f951fe95c..ef64b6c230 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -396,6 +396,19 @@ public class AMQSession implements SessionCallback this.isTx = true; } + + public void endTransaction(TransactionInfo info) throws Exception + { + checkTx(info.getTransactionId()); + + if (txId.isXATransaction()) + { + XATransactionId xid = (XATransactionId) txId; + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaEnd(coreXid); + } + } + public void commitOnePhase(TransactionInfo info) throws Exception { checkTx(info.getTransactionId());