diff --git a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java index 07d593f6f5..97d3871ef7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -27,11 +28,13 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.DataArrayResponse; import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.IntegerResponse; import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Response; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; @@ -280,7 +283,7 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); this.transactionId = null; // Notify the listener that the tx was committed back - this.connection.syncSendPacket(info); + syncSendPacketWithInterruptionHandling(info); if (localTransactionEventListener != null) { localTransactionEventListener.commitEvent(); } @@ -399,7 +402,7 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); // Find out if the server wants to commit or rollback. - IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); + IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); return response.getResult(); } catch (JMSException e) { @@ -433,7 +436,7 @@ public class TransactionContext implements XAResource { // Let the server know that the tx is rollback. TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); - this.connection.syncSendPacket(info); + syncSendPacketWithInterruptionHandling(info); List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); if (l != null && !l.isEmpty()) { @@ -472,7 +475,7 @@ public class TransactionContext implements XAResource { // Notify the server that the tx was committed back TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); - this.connection.syncSendPacket(info); + syncSendPacketWithInterruptionHandling(info); List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); if (l != null && !l.isEmpty()) { @@ -509,7 +512,7 @@ public class TransactionContext implements XAResource { try { // Tell the server to forget the transaction. - this.connection.syncSendPacket(info); + syncSendPacketWithInterruptionHandling(info); } catch (JMSException e) { throw toXAException(e); } @@ -601,7 +604,7 @@ public class TransactionContext implements XAResource { if (transactionId != null) { TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END); try { - this.connection.syncSendPacket(info); + syncSendPacketWithInterruptionHandling(info); if (LOG.isDebugEnabled()) { LOG.debug("Ended XA transaction: " + transactionId); } @@ -627,6 +630,31 @@ public class TransactionContext implements XAResource { } } + /** + * Sends the given command. Also sends the command in case of interruption, + * so that important commands like rollback and commit are never interrupted. + * If interruption occurred, set the interruption state of the current + * after performing the action again. + * + * @return the response + */ + private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { + try { + return this.connection.syncSendPacket(command); + } catch (JMSException e) { + if (e.getLinkedException() instanceof InterruptedIOException) { + try { + Thread.interrupted(); + return this.connection.syncSendPacket(command); + } finally { + Thread.currentThread().interrupt(); + } + } + + throw e; + } + } + /** * Converts a JMSException from the server to an XAException. if the * JMSException contained a linked XAException that is returned instead.