mirror of https://github.com/apache/activemq.git
This closes #143 Applying patch with thanks to Erik Wramner
This commit is contained in:
commit
450a5226e0
|
@ -98,13 +98,11 @@ public class TransactionContext implements XAResource {
|
||||||
if (transactionId != null && transactionId.isXATransaction()) {
|
if (transactionId != null && transactionId.isXATransaction()) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
|
||||||
for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
|
if (transactions.contains(this)) {
|
||||||
if (transactions.contains(this)) {
|
return true;
|
||||||
return true;
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -470,9 +468,14 @@ public class TransactionContext implements XAResource {
|
||||||
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
|
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
|
||||||
if (XAResource.XA_RDONLY == response.getResult()) {
|
if (XAResource.XA_RDONLY == response.getResult()) {
|
||||||
// transaction stops now, may be syncs that need a callback
|
// transaction stops now, may be syncs that need a callback
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
if (l != null && !l.isEmpty()) {
|
}
|
||||||
|
// After commit may be expensive and can deadlock, do it outside global synch block
|
||||||
|
// No risk for concurrent updates as we own the list now
|
||||||
|
if (l != null) {
|
||||||
|
if(! l.isEmpty()) {
|
||||||
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
|
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
|
||||||
for (TransactionContext ctx : l) {
|
for (TransactionContext ctx : l) {
|
||||||
ctx.afterCommit();
|
ctx.afterCommit();
|
||||||
|
@ -484,16 +487,19 @@ public class TransactionContext implements XAResource {
|
||||||
|
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
LOG.warn("prepare of: " + x + " failed with: " + e, e);
|
LOG.warn("prepare of: " + x + " failed with: " + e, e);
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
if (l != null && !l.isEmpty()) {
|
}
|
||||||
for (TransactionContext ctx : l) {
|
// After rollback may be expensive and can deadlock, do it outside global synch block
|
||||||
try {
|
// No risk for concurrent updates as we own the list now
|
||||||
ctx.afterRollback();
|
if (l != null) {
|
||||||
} catch (Throwable ignored) {
|
for (TransactionContext ctx : l) {
|
||||||
LOG.debug("failed to firing afterRollback callbacks on prepare " +
|
try {
|
||||||
"failure, txid: {}, context: {}", x, ctx, ignored);
|
ctx.afterRollback();
|
||||||
}
|
} catch (Throwable ignored) {
|
||||||
|
LOG.debug("failed to firing afterRollback callbacks on prepare " +
|
||||||
|
"failure, txid: {}, context: {}", x, ctx, ignored);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -530,12 +536,15 @@ public class TransactionContext implements XAResource {
|
||||||
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
|
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
|
||||||
syncSendPacketWithInterruptionHandling(info);
|
syncSendPacketWithInterruptionHandling(info);
|
||||||
|
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
if (l != null && !l.isEmpty()) {
|
}
|
||||||
for (TransactionContext ctx : l) {
|
// After rollback may be expensive and can deadlock, do it outside global synch block
|
||||||
ctx.afterRollback();
|
// No risk for concurrent updates as we own the list now
|
||||||
}
|
if (l != null) {
|
||||||
|
for (TransactionContext ctx : l) {
|
||||||
|
ctx.afterRollback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
|
@ -574,15 +583,18 @@ public class TransactionContext implements XAResource {
|
||||||
|
|
||||||
syncSendPacketWithInterruptionHandling(info);
|
syncSendPacketWithInterruptionHandling(info);
|
||||||
|
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
if (l != null && !l.isEmpty()) {
|
}
|
||||||
for (TransactionContext ctx : l) {
|
// After commit may be expensive and can deadlock, do it outside global synch block
|
||||||
try {
|
// No risk for concurrent updates as we own the list now
|
||||||
ctx.afterCommit();
|
if (l != null) {
|
||||||
} catch (Exception ignored) {
|
for (TransactionContext ctx : l) {
|
||||||
LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
|
try {
|
||||||
}
|
ctx.afterCommit();
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -590,15 +602,18 @@ public class TransactionContext implements XAResource {
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
LOG.warn("commit of: " + x + " failed with: " + e, e);
|
LOG.warn("commit of: " + x + " failed with: " + e, e);
|
||||||
if (onePhase) {
|
if (onePhase) {
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
if (l != null && !l.isEmpty()) {
|
}
|
||||||
for (TransactionContext ctx : l) {
|
// After rollback may be expensive and can deadlock, do it outside global synch block
|
||||||
try {
|
// No risk for concurrent updates as we own the list now
|
||||||
ctx.afterRollback();
|
if (l != null) {
|
||||||
} catch (Throwable ignored) {
|
for (TransactionContext ctx : l) {
|
||||||
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
|
try {
|
||||||
}
|
ctx.afterRollback();
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -735,13 +750,14 @@ public class TransactionContext implements XAResource {
|
||||||
|
|
||||||
// Add our self to the list of contexts that are interested in
|
// Add our self to the list of contexts that are interested in
|
||||||
// post commit/rollback events.
|
// post commit/rollback events.
|
||||||
|
List<TransactionContext> l;
|
||||||
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
|
||||||
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
|
l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
|
||||||
if (l == null) {
|
if (l == null) {
|
||||||
l = new ArrayList<TransactionContext>(3);
|
l = new ArrayList<TransactionContext>(3);
|
||||||
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
|
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
|
||||||
l.add(this);
|
}
|
||||||
} else if (!l.contains(this)) {
|
if (!l.contains(this)) {
|
||||||
l.add(this);
|
l.add(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue