mirror of https://github.com/apache/activemq.git
Transactions, when recovered, need to be recovered in prepare order:
a) order determined by an unsorted map iterator might confuse people b) brokerSequence can get out of step for messages recovered then committed - e.g. recovery on startup git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27ded73ca6
commit
3093ea0f6e
|
@ -40,6 +40,7 @@ import javax.transaction.xa.XAException;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -53,7 +54,7 @@ public class TransactionBroker extends BrokerFilter {
|
|||
|
||||
// The prepared XA transactions.
|
||||
private TransactionStore transactionStore;
|
||||
private ConcurrentHashMap xaTransactions = new ConcurrentHashMap();
|
||||
private Map xaTransactions = new LinkedHashMap();
|
||||
|
||||
public TransactionBroker(Broker next, TransactionStore transactionStore) {
|
||||
super(next);
|
||||
|
@ -70,7 +71,6 @@ public class TransactionBroker extends BrokerFilter {
|
|||
* Recovers any prepared transactions.
|
||||
*/
|
||||
public void start() throws Exception {
|
||||
next.start();
|
||||
transactionStore.start();
|
||||
try {
|
||||
final ConnectionContext context = new ConnectionContext();
|
||||
|
@ -99,6 +99,7 @@ public class TransactionBroker extends BrokerFilter {
|
|||
Throwable cause = e.getCause();
|
||||
throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause);
|
||||
}
|
||||
next.start();
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
|
@ -114,31 +115,34 @@ public class TransactionBroker extends BrokerFilter {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
|
||||
ArrayList txs = new ArrayList();
|
||||
synchronized(xaTransactions){
|
||||
for(Iterator iter=xaTransactions.values().iterator();iter.hasNext();){
|
||||
Transaction tx=(Transaction)iter.next();
|
||||
if(tx.isPrepared())
|
||||
txs.add(tx.getTransactionId());
|
||||
}
|
||||
}
|
||||
XATransactionId rc[] = new XATransactionId[txs.size()];
|
||||
txs.toArray(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
|
||||
|
||||
// the transaction may have already been started.
|
||||
if(xid.isXATransaction()){
|
||||
Transaction transaction = (Transaction)xaTransactions.get(xid);
|
||||
Transaction transaction=null;
|
||||
synchronized(xaTransactions){
|
||||
transaction=(Transaction)xaTransactions.get(xid);
|
||||
if(transaction!=null)
|
||||
return;
|
||||
transaction=new XATransaction(transactionStore,(XATransactionId)xid,this);
|
||||
xaTransactions.put(xid,transaction);
|
||||
}
|
||||
}else{
|
||||
Map transactionMap=context.getTransactions();
|
||||
Transaction transaction=(Transaction)transactionMap.get(xid);
|
||||
if(transaction!=null)
|
||||
throw new JMSException("Transaction '"+xid+"' has already been started.");
|
||||
|
||||
transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context);
|
||||
transactionMap.put(xid,transaction);
|
||||
}
|
||||
|
@ -215,13 +219,15 @@ public class TransactionBroker extends BrokerFilter {
|
|||
// Implementation help methods.
|
||||
//
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
|
||||
Map transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
|
||||
public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared)
|
||||
throws JMSException,XAException{
|
||||
Map transactionMap=null;
|
||||
synchronized(xaTransactions){
|
||||
transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions();
|
||||
}
|
||||
Transaction transaction=(Transaction)transactionMap.get(xid);
|
||||
|
||||
if(transaction!=null)
|
||||
return transaction;
|
||||
|
||||
if(xid.isXATransaction()){
|
||||
XAException e=new XAException("Transaction '"+xid+"' has not been started.");
|
||||
e.errorCode=XAException.XAER_NOTA;
|
||||
|
@ -232,7 +238,9 @@ public class TransactionBroker extends BrokerFilter {
|
|||
}
|
||||
|
||||
public void removeTransaction(XATransactionId xid){
|
||||
synchronized(xaTransactions){
|
||||
xaTransactions.remove(xid);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue