Transactions, when recovered, need to be recovered in prepare order:

a)  order determined by an unsorted map iterator might confuse people
b) brokerSequence can get out of step for messages recovered then committed - e.g. 
recovery on startup

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490795 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-28 20:51:22 +00:00
parent b1c94aa00b
commit 4a16f4527a
1 changed files with 113 additions and 77 deletions

View File

@ -21,6 +21,9 @@ package org.apache.activemq.store.journal;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
@ -34,15 +37,14 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import java.util.concurrent.ConcurrentHashMap;
/** /**
*/ */
public class JournalTransactionStore implements TransactionStore { public class JournalTransactionStore implements TransactionStore {
private final JournalPersistenceAdapter peristenceAdapter; private final JournalPersistenceAdapter peristenceAdapter;
ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); Map inflightTransactions = new LinkedHashMap();
ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); Map preparedTransactions = new LinkedHashMap();
private boolean doingRecover; private boolean doingRecover;
@ -128,30 +130,43 @@ public class JournalTransactionStore implements TransactionStore {
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/ */
public void prepare(TransactionId txid) throws IOException { public void prepare(TransactionId txid) throws IOException{
Tx tx = (Tx) inflightTransactions.remove(txid); Tx tx=null;
if (tx == null) synchronized(inflightTransactions){
tx=(Tx)inflightTransactions.remove(txid);
}
if(tx==null)
return; return;
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true); peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
preparedTransactions.put(txid, tx); synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
} }
/** /**
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/ */
public void replayPrepare(TransactionId txid) throws IOException { public void replayPrepare(TransactionId txid) throws IOException{
Tx tx = (Tx) inflightTransactions.remove(txid); Tx tx=null;
if (tx == null) synchronized(inflightTransactions){
tx=(Tx)inflightTransactions.remove(txid);
}
if(tx==null)
return; return;
preparedTransactions.put(txid, tx); synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
} }
public Tx getTx(Object txid, RecordLocation location) { public Tx getTx(Object txid,RecordLocation location){
Tx tx = (Tx) inflightTransactions.get(txid); Tx tx=null;
if (tx == null) { synchronized(inflightTransactions){
tx = new Tx(location); tx=(Tx)inflightTransactions.get(txid);
inflightTransactions.put(txid, tx); }
if(tx==null){
tx=new Tx(location);
inflightTransactions.put(txid,tx);
} }
return tx; return tx;
} }
@ -160,22 +175,23 @@ public class JournalTransactionStore implements TransactionStore {
* @throws XAException * @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/ */
public void commit(TransactionId txid, boolean wasPrepared) throws IOException { public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx; Tx tx;
if (wasPrepared) { if(wasPrepared){
tx = (Tx) preparedTransactions.remove(txid); synchronized(preparedTransactions){
} else { tx=(Tx)preparedTransactions.remove(txid);
tx = (Tx) inflightTransactions.remove(txid); }
}else{
synchronized(inflightTransactions){
tx=(Tx)inflightTransactions.remove(txid);
}
} }
if(tx==null)
if (tx == null)
return; return;
if(txid.isXATransaction()){
if (txid.isXATransaction()) { peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), }else{
true); peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
true); true);
} }
} }
@ -184,11 +200,15 @@ public class JournalTransactionStore implements TransactionStore {
* @throws XAException * @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/ */
public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
if (wasPrepared) { if(wasPrepared){
return (Tx) preparedTransactions.remove(txid); synchronized(preparedTransactions){
} else { return (Tx)preparedTransactions.remove(txid);
return (Tx) inflightTransactions.remove(txid); }
}else{
synchronized(inflightTransactions){
return (Tx)inflightTransactions.remove(txid);
}
} }
} }
@ -196,31 +216,39 @@ public class JournalTransactionStore implements TransactionStore {
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/ */
public void rollback(TransactionId txid) throws IOException { public void rollback(TransactionId txid) throws IOException{
Tx tx=null;
Tx tx = (Tx) inflightTransactions.remove(txid); synchronized(inflightTransactions){
if (tx != null) tx=(Tx)inflightTransactions.remove(txid);
tx = (Tx) preparedTransactions.remove(txid); }
if(tx!=null)
if (tx != null) { synchronized(preparedTransactions){
if (txid.isXATransaction()) { tx=(Tx)preparedTransactions.remove(txid);
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), }
true); if(tx!=null){
} else { if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
true); true);
} }
} }
} }
/** /**
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/ */
public void replayRollback(TransactionId txid) throws IOException { public void replayRollback(TransactionId txid) throws IOException{
if (inflightTransactions.remove(txid) != null) boolean inflight=false;
preparedTransactions.remove(txid); synchronized(inflightTransactions){
inflight=inflightTransactions.remove(txid)!=null;
}
if(inflight){
synchronized(preparedTransactions){
preparedTransactions.remove(txid);
}
}
} }
public void start() throws Exception { public void start() throws Exception {
@ -229,18 +257,24 @@ public class JournalTransactionStore implements TransactionStore {
public void stop() throws Exception { public void stop() throws Exception {
} }
synchronized public void recover(TransactionRecoveryListener listener) throws IOException { synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back.. // All the in-flight transactions get rolled back..
inflightTransactions.clear(); synchronized(inflightTransactions){
this.doingRecover = true; inflightTransactions.clear();
try { }
for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { this.doingRecover=true;
Object txid = (Object) iter.next(); try{
Tx tx = (Tx) preparedTransactions.get(txid); Map txs=null;
listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); synchronized(preparedTransactions){
txs=new LinkedHashMap(preparedTransactions);
} }
} finally { for(Iterator iter=txs.keySet().iterator();iter.hasNext();){
this.doingRecover = false; Object txid=(Object)iter.next();
Tx tx=(Tx)txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
this.doingRecover=false;
} }
} }
@ -269,30 +303,32 @@ public class JournalTransactionStore implements TransactionStore {
} }
public RecordLocation checkpoint() throws IOException { public RecordLocation checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't // Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed. // checkpoint tx operations in to long term store until they are committed.
// But we keep track of the first location of an operation // But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not // that was associated with an active tx. The journal can not
// roll over active tx records. // roll over active tx records.
RecordLocation rc = null; RecordLocation rc=null;
for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { synchronized(inflightTransactions){
Tx tx = (Tx) iter.next(); for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){
RecordLocation location = tx.location; Tx tx=(Tx)iter.next();
if (rc == null || rc.compareTo(location) < 0) { RecordLocation location=tx.location;
rc = location; if(rc==null||rc.compareTo(location)<0){
rc=location;
}
} }
} }
for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { synchronized(preparedTransactions){
Tx tx = (Tx) iter.next(); for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){
RecordLocation location = tx.location; Tx tx=(Tx)iter.next();
if (rc == null || rc.compareTo(location) < 0) { RecordLocation location=tx.location;
rc = location; if(rc==null||rc.compareTo(location)<0){
rc=location;
}
} }
return rc;
} }
return rc;
} }
public boolean isDoingRecover() { public boolean isDoingRecover() {