diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java index 26468f9694..905bf418b8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java @@ -21,6 +21,9 @@ package org.apache.activemq.store.journal; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import javax.transaction.xa.XAException; @@ -34,15 +37,14 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; -import java.util.concurrent.ConcurrentHashMap; /** */ public class JournalTransactionStore implements TransactionStore { private final JournalPersistenceAdapter peristenceAdapter; - ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); - ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); + Map inflightTransactions = new LinkedHashMap(); + Map preparedTransactions = new LinkedHashMap(); private boolean doingRecover; @@ -128,30 +130,43 @@ public class JournalTransactionStore implements TransactionStore { * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ - public void prepare(TransactionId txid) throws IOException { - Tx tx = (Tx) inflightTransactions.remove(txid); - if (tx == null) + public void prepare(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=(Tx)inflightTransactions.remove(txid); + } + if(tx==null) return; - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true); - preparedTransactions.put(txid, tx); + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); + synchronized(preparedTransactions){ + preparedTransactions.put(txid,tx); + } } /** * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ - public void replayPrepare(TransactionId txid) throws IOException { - Tx tx = (Tx) inflightTransactions.remove(txid); - if (tx == null) + public void replayPrepare(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=(Tx)inflightTransactions.remove(txid); + } + if(tx==null) return; - preparedTransactions.put(txid, tx); + synchronized(preparedTransactions){ + preparedTransactions.put(txid,tx); + } } - public Tx getTx(Object txid, RecordLocation location) { - Tx tx = (Tx) inflightTransactions.get(txid); - if (tx == null) { - tx = new Tx(location); - inflightTransactions.put(txid, tx); + public Tx getTx(Object txid,RecordLocation location){ + Tx tx=null; + synchronized(inflightTransactions){ + tx=(Tx)inflightTransactions.get(txid); + } + if(tx==null){ + tx=new Tx(location); + inflightTransactions.put(txid,tx); } return tx; } @@ -160,22 +175,23 @@ public class JournalTransactionStore implements TransactionStore { * @throws XAException * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ - public void commit(TransactionId txid, boolean wasPrepared) throws IOException { + public void commit(TransactionId txid,boolean wasPrepared) throws IOException{ Tx tx; - if (wasPrepared) { - tx = (Tx) preparedTransactions.remove(txid); - } else { - tx = (Tx) inflightTransactions.remove(txid); + if(wasPrepared){ + synchronized(preparedTransactions){ + tx=(Tx)preparedTransactions.remove(txid); + } + }else{ + synchronized(inflightTransactions){ + tx=(Tx)inflightTransactions.remove(txid); + } } - - if (tx == null) + if(tx==null) return; - - if (txid.isXATransaction()) { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), - true); - } else { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), + if(txid.isXATransaction()){ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); + }else{ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), true); } } @@ -184,11 +200,15 @@ public class JournalTransactionStore implements TransactionStore { * @throws XAException * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ - public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { - if (wasPrepared) { - return (Tx) preparedTransactions.remove(txid); - } else { - return (Tx) inflightTransactions.remove(txid); + public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ + if(wasPrepared){ + synchronized(preparedTransactions){ + return (Tx)preparedTransactions.remove(txid); + } + }else{ + synchronized(inflightTransactions){ + return (Tx)inflightTransactions.remove(txid); + } } } @@ -196,31 +216,39 @@ public class JournalTransactionStore implements TransactionStore { * @throws IOException * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ - public void rollback(TransactionId txid) throws IOException { - - Tx tx = (Tx) inflightTransactions.remove(txid); - if (tx != null) - tx = (Tx) preparedTransactions.remove(txid); - - if (tx != null) { - if (txid.isXATransaction()) { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), - true); - } else { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), + public void rollback(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=(Tx)inflightTransactions.remove(txid); + } + if(tx!=null) + synchronized(preparedTransactions){ + tx=(Tx)preparedTransactions.remove(txid); + } + if(tx!=null){ + if(txid.isXATransaction()){ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); + }else{ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), true); } } - } /** * @throws IOException * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ - public void replayRollback(TransactionId txid) throws IOException { - if (inflightTransactions.remove(txid) != null) - preparedTransactions.remove(txid); + public void replayRollback(TransactionId txid) throws IOException{ + boolean inflight=false; + synchronized(inflightTransactions){ + inflight=inflightTransactions.remove(txid)!=null; + } + if(inflight){ + synchronized(preparedTransactions){ + preparedTransactions.remove(txid); + } + } } public void start() throws Exception { @@ -229,18 +257,24 @@ public class JournalTransactionStore implements TransactionStore { public void stop() throws Exception { } - synchronized public void recover(TransactionRecoveryListener listener) throws IOException { + synchronized public void recover(TransactionRecoveryListener listener) throws IOException{ // All the in-flight transactions get rolled back.. - inflightTransactions.clear(); - this.doingRecover = true; - try { - for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { - Object txid = (Object) iter.next(); - Tx tx = (Tx) preparedTransactions.get(txid); - listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); + synchronized(inflightTransactions){ + inflightTransactions.clear(); + } + this.doingRecover=true; + try{ + Map txs=null; + synchronized(preparedTransactions){ + txs=new LinkedHashMap(preparedTransactions); } - } finally { - this.doingRecover = false; + for(Iterator iter=txs.keySet().iterator();iter.hasNext();){ + Object txid=(Object)iter.next(); + Tx tx=(Tx)txs.get(txid); + listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); + } + }finally{ + this.doingRecover=false; } } @@ -269,30 +303,32 @@ public class JournalTransactionStore implements TransactionStore { } - public RecordLocation checkpoint() throws IOException { - + public RecordLocation checkpoint() throws IOException{ // Nothing really to checkpoint.. since, we don't // checkpoint tx operations in to long term store until they are committed. - // But we keep track of the first location of an operation // that was associated with an active tx. The journal can not - // roll over active tx records. - RecordLocation rc = null; - for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { - Tx tx = (Tx) iter.next(); - RecordLocation location = tx.location; - if (rc == null || rc.compareTo(location) < 0) { - rc = location; + // roll over active tx records. + RecordLocation rc=null; + synchronized(inflightTransactions){ + for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ + Tx tx=(Tx)iter.next(); + RecordLocation location=tx.location; + if(rc==null||rc.compareTo(location)<0){ + rc=location; + } } } - for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { - Tx tx = (Tx) iter.next(); - RecordLocation location = tx.location; - if (rc == null || rc.compareTo(location) < 0) { - rc = location; + synchronized(preparedTransactions){ + for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ + Tx tx=(Tx)iter.next(); + RecordLocation location=tx.location; + if(rc==null||rc.compareTo(location)<0){ + rc=location; + } } + return rc; } - return rc; } public boolean isDoingRecover() {