Prevent deadlock by moving calls to afterCommit
and afterRollback out of synchronized blocks.

https://issues.apache.org/jira/browse/AMQ-5961
Lock list but keep global hashmap out for deadlock protection.

https://issues.apache.org/jira/browse/AMQ-5961
Lock safely in isInXATransaction.

https://issues.apache.org/jira/browse/AMQ-5961
Don't synchronize on list as we have an exclusive reference.
This commit is contained in:
erik-wramner 2015-09-10 18:44:36 +02:00 committed by Christopher L. Shannon (cshannon)
parent fc25535748
commit 9a78bc689f
1 changed files with 59 additions and 43 deletions

View File

@ -98,13 +98,11 @@ public class TransactionContext implements XAResource {
if (transactionId != null && transactionId.isXATransaction()) { if (transactionId != null && transactionId.isXATransaction()) {
return true; return true;
} else { } else {
if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { if (transactions.contains(this)) {
if (transactions.contains(this)) { return true;
return true; }
}
}
} }
} }
} }
@ -470,9 +468,14 @@ public class TransactionContext implements XAResource {
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
if (XAResource.XA_RDONLY == response.getResult()) { if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback // transaction stops now, may be syncs that need a callback
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) { }
// After commit may be expensive and can deadlock, do it outside global synch block
// No risk for concurrent updates as we own the list now
if (l != null) {
if(! l.isEmpty()) {
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
for (TransactionContext ctx : l) { for (TransactionContext ctx : l) {
ctx.afterCommit(); ctx.afterCommit();
@ -484,16 +487,19 @@ public class TransactionContext implements XAResource {
} catch (JMSException e) { } catch (JMSException e) {
LOG.warn("prepare of: " + x + " failed with: " + e, e); LOG.warn("prepare of: " + x + " failed with: " + e, e);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) { }
for (TransactionContext ctx : l) { // After rollback may be expensive and can deadlock, do it outside global synch block
try { // No risk for concurrent updates as we own the list now
ctx.afterRollback(); if (l != null) {
} catch (Throwable ignored) { for (TransactionContext ctx : l) {
LOG.debug("failed to firing afterRollback callbacks on prepare " + try {
"failure, txid: {}, context: {}", x, ctx, ignored); ctx.afterRollback();
} } catch (Throwable ignored) {
LOG.debug("failed to firing afterRollback callbacks on prepare " +
"failure, txid: {}, context: {}", x, ctx, ignored);
} }
} }
} }
@ -530,12 +536,15 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
syncSendPacketWithInterruptionHandling(info); syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) { }
for (TransactionContext ctx : l) { // After rollback may be expensive and can deadlock, do it outside global synch block
ctx.afterRollback(); // No risk for concurrent updates as we own the list now
} if (l != null) {
for (TransactionContext ctx : l) {
ctx.afterRollback();
} }
} }
} catch (JMSException e) { } catch (JMSException e) {
@ -574,15 +583,18 @@ public class TransactionContext implements XAResource {
syncSendPacketWithInterruptionHandling(info); syncSendPacketWithInterruptionHandling(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) { }
for (TransactionContext ctx : l) { // After commit may be expensive and can deadlock, do it outside global synch block
try { // No risk for concurrent updates as we own the list now
ctx.afterCommit(); if (l != null) {
} catch (Exception ignored) { for (TransactionContext ctx : l) {
LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); try {
} ctx.afterCommit();
} catch (Exception ignored) {
LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
} }
} }
} }
@ -590,15 +602,18 @@ public class TransactionContext implements XAResource {
} catch (JMSException e) { } catch (JMSException e) {
LOG.warn("commit of: " + x + " failed with: " + e, e); LOG.warn("commit of: " + x + " failed with: " + e, e);
if (onePhase) { if (onePhase) {
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
if (l != null && !l.isEmpty()) { }
for (TransactionContext ctx : l) { // After rollback may be expensive and can deadlock, do it outside global synch block
try { // No risk for concurrent updates as we own the list now
ctx.afterRollback(); if (l != null) {
} catch (Throwable ignored) { for (TransactionContext ctx : l) {
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); try {
} ctx.afterRollback();
} catch (Throwable ignored) {
LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
} }
} }
} }
@ -735,13 +750,14 @@ public class TransactionContext implements XAResource {
// Add our self to the list of contexts that are interested in // Add our self to the list of contexts that are interested in
// post commit/rollback events. // post commit/rollback events.
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
if (l == null) { if (l == null) {
l = new ArrayList<TransactionContext>(3); l = new ArrayList<TransactionContext>(3);
ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
l.add(this); }
} else if (!l.contains(this)) { if (!l.contains(this)) {
l.add(this); l.add(this);
} }
} }