git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-09-11 17:10:49 +00:00
parent 42282810a8
commit e01acf44ca
1 changed files with 34 additions and 6 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -27,11 +28,13 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
@ -280,7 +283,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
this.transactionId = null;
// Notify the listener that the tx was committed back
this.connection.syncSendPacket(info);
syncSendPacketWithInterruptionHandling(info);
if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
@ -399,7 +402,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
// Find out if the server wants to commit or rollback.
IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
return response.getResult();
} catch (JMSException e) {
@ -433,7 +436,7 @@ public class TransactionContext implements XAResource {
// Let the server know that the tx is rollback.
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
this.connection.syncSendPacket(info);
syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
@ -472,7 +475,7 @@ public class TransactionContext implements XAResource {
// Notify the server that the tx was committed back
TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
this.connection.syncSendPacket(info);
syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
@ -509,7 +512,7 @@ public class TransactionContext implements XAResource {
try {
// Tell the server to forget the transaction.
this.connection.syncSendPacket(info);
syncSendPacketWithInterruptionHandling(info);
} catch (JMSException e) {
throw toXAException(e);
}
@ -601,7 +604,7 @@ public class TransactionContext implements XAResource {
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
try {
this.connection.syncSendPacket(info);
syncSendPacketWithInterruptionHandling(info);
if (LOG.isDebugEnabled()) {
LOG.debug("Ended XA transaction: " + transactionId);
}
@ -627,6 +630,31 @@ public class TransactionContext implements XAResource {
}
}
/**
* Sends the given command. Also sends the command in case of interruption,
* so that important commands like rollback and commit are never interrupted.
* If interruption occurred, set the interruption state of the current
* after performing the action again.
*
* @return the response
*/
private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
try {
return this.connection.syncSendPacket(command);
} catch (JMSException e) {
if (e.getLinkedException() instanceof InterruptedIOException) {
try {
Thread.interrupted();
return this.connection.syncSendPacket(command);
} finally {
Thread.currentThread().interrupt();
}
}
throw e;
}
}
/**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.