git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1178398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-03 12:43:51 +00:00
parent 0890672e08
commit e6254c2f8a
1 changed files with 91 additions and 63 deletions
activemq-core/src/main/java/org/apache/activemq

View File

@ -19,8 +19,8 @@ package org.apache.activemq;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.jms.TransactionInProgressException;
@ -68,7 +68,8 @@ public class TransactionContext implements XAResource {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
// XATransactionId -> ArrayList of TransactionContext objects
private final static ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
new HashMap<TransactionId, List<TransactionContext>>();
private final ActiveMQConnection connection;
private final LongSequenceGenerator localTransactionIdGenerator;
@ -88,8 +89,21 @@ public class TransactionContext implements XAResource {
}
public boolean isInXATransaction() {
return (transactionId != null && transactionId.isXATransaction()) ||
(!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() && ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this));
if (transactionId != null && transactionId.isXATransaction()) {
return true;
} else {
if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
if (transactions.contains(this)) {
return true;
}
}
}
}
}
return false;
}
public boolean isInLocalTransaction() {
@ -437,32 +451,37 @@ public class TransactionContext implements XAResource {
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
}
for (TransactionContext ctx : l) {
ctx.afterCommit();
}
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
}
for (TransactionContext ctx : l) {
ctx.afterCommit();
}
}
}
}
return response.getResult();
} catch (JMSException e) {
LOG.warn("prepare of: " + x + " failed with: " + e, e);
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + x + ", context: " + ctx, ignored);
}
}
}
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
x + ", context: " + ctx, ignored);
}
}
}
}
}
throw toXAException(e);
}
}
@ -495,13 +514,14 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
ctx.afterRollback();
}
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
ctx.afterRollback();
}
}
}
} catch (JMSException e) {
throw toXAException(e);
}
@ -534,32 +554,36 @@ public class TransactionContext implements XAResource {
syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterCommit();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
}
}
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterCommit();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
}
}
}
}
} catch (JMSException e) {
LOG.warn("commit of: " + x + " failed with: " + e, e);
if (onePhase) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
}
}
}
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) {
for (TransactionContext ctx : l) {
try {
ctx.afterRollback();
} catch (Throwable ignored) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
}
}
}
}
}
}
throw toXAException(e);
}
@ -592,7 +616,9 @@ public class TransactionContext implements XAResource {
} catch (JMSException e) {
throw toXAException(e);
}
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
}
}
public boolean isSameRM(XAResource xaResource) throws XAException {
@ -691,14 +717,16 @@ public class TransactionContext implements XAResource {
// Add our self to the list of contexts that are interested in
// post commit/rollback events.
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
if (l == null) {
l = new ArrayList<TransactionContext>(3);
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
l.add(this);
} else if (!l.contains(this)) {
l.add(this);
}
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
if (l == null) {
l = new ArrayList<TransactionContext>(3);
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
l.add(this);
} else if (!l.contains(this)) {
l.add(this);
}
}
}
// dis-associate