[AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)

This commit is contained in:
Matt Pavlovich 2021-12-20 08:37:56 -06:00 committed by GitHub
parent ee768a28d6
commit b196e9a88a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 24 deletions

View File

@ -52,15 +52,15 @@ public abstract class Transaction {
public Object call() throws Exception { public Object call() throws Exception {
doPreCommit(); doPreCommit();
return null; return null;
} }
}); });
protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() { protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception { public Object call() throws Exception {
doPostCommit(); doPostCommit();
return null; return null;
} }
}); });
public byte getState() { public byte getState() {
return state; return state;
} }
@ -87,15 +87,19 @@ public abstract class Transaction {
} }
public Synchronization findMatching(Synchronization r) { public Synchronization findMatching(Synchronization r) {
int existing = synchronizations.indexOf(r); synchronized(synchronizations) {
if (existing != -1) { int existing = synchronizations.indexOf(r);
return synchronizations.get(existing); if (existing != -1) {
} return synchronizations.get(existing);
}
}
return null; return null;
} }
public void removeSynchronization(Synchronization r) { public void removeSynchronization(Synchronization r) {
synchronizations.remove(r); synchronized(synchronizations) {
synchronizations.remove(r);
}
} }
public void prePrepare() throws Exception { public void prePrepare() throws Exception {
@ -119,26 +123,32 @@ public abstract class Transaction {
throw xae; throw xae;
} }
} }
protected void fireBeforeCommit() throws Exception { protected void fireBeforeCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { synchronized(synchronizations) {
Synchronization s = iter.next(); for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
s.beforeCommit(); Synchronization s = iter.next();
s.beforeCommit();
}
} }
} }
protected void fireAfterCommit() throws Exception { protected void fireAfterCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { synchronized(synchronizations) {
Synchronization s = iter.next(); for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
s.afterCommit(); Synchronization s = iter.next();
s.afterCommit();
}
} }
} }
public void fireAfterRollback() throws Exception { public void fireAfterRollback() throws Exception {
Collections.reverse(synchronizations); synchronized(synchronizations) {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { Collections.reverse(synchronizations);
Synchronization s = iter.next(); for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
s.afterRollback(); Synchronization s = iter.next();
s.afterRollback();
}
} }
} }
@ -156,15 +166,15 @@ public abstract class Transaction {
public abstract TransactionId getTransactionId(); public abstract TransactionId getTransactionId();
public abstract Logger getLog(); public abstract Logger getLog();
public boolean isPrepared() { public boolean isPrepared() {
return getState() == PREPARED_STATE; return getState() == PREPARED_STATE;
} }
public int size() { public int size() {
return synchronizations.size(); return synchronizations.size();
} }
protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException { protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
try { try {
postCommitTask.get(); postCommitTask.get();
@ -179,9 +189,9 @@ public abstract class Transaction {
} else { } else {
throw new XAException(e.toString()); throw new XAException(e.toString());
} }
} }
} }
protected void doPreCommit() throws XAException { protected void doPreCommit() throws XAException {
try { try {
fireBeforeCommit(); fireBeforeCommit();