diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index 1274161038..5ca92a441a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -40,6 +40,7 @@ import javax.transaction.xa.XAException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -53,7 +54,7 @@ public class TransactionBroker extends BrokerFilter { // The prepared XA transactions. private TransactionStore transactionStore; - private ConcurrentHashMap xaTransactions = new ConcurrentHashMap(); + private Map xaTransactions = new LinkedHashMap(); public TransactionBroker(Broker next, TransactionStore transactionStore) { super(next); @@ -70,8 +71,7 @@ public class TransactionBroker extends BrokerFilter { * Recovers any prepared transactions. */ public void start() throws Exception { - next.start(); - transactionStore.start(); + transactionStore.start(); try { final ConnectionContext context = new ConnectionContext(); context.setBroker(this); @@ -99,6 +99,7 @@ public class TransactionBroker extends BrokerFilter { Throwable cause = e.getCause(); throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause); } + next.start(); } public void stop() throws Exception { @@ -114,33 +115,36 @@ public class TransactionBroker extends BrokerFilter { ////////////////////////////////////////////////////////////////////////////// public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { ArrayList txs = new ArrayList(); - for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) { - Transaction tx = (Transaction) iter.next(); - if( tx.isPrepared() ) - txs.add(tx.getTransactionId()); + synchronized(xaTransactions){ + for(Iterator iter=xaTransactions.values().iterator();iter.hasNext();){ + Transaction tx=(Transaction)iter.next(); + if(tx.isPrepared()) + txs.add(tx.getTransactionId()); + } } XATransactionId rc[] = new XATransactionId[txs.size()]; txs.toArray(rc); return rc; } - public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { - + public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{ // the transaction may have already been started. - if( xid.isXATransaction() ) { - Transaction transaction = (Transaction)xaTransactions.get(xid); - if( transaction != null ) - return; - transaction = new XATransaction(transactionStore, (XATransactionId)xid, this); - xaTransactions.put(xid, transaction); - } else { - Map transactionMap = context.getTransactions(); - Transaction transaction = (Transaction)transactionMap.get(xid); - if( transaction != null ) + if(xid.isXATransaction()){ + Transaction transaction=null; + synchronized(xaTransactions){ + transaction=(Transaction)xaTransactions.get(xid); + if(transaction!=null) + return; + transaction=new XATransaction(transactionStore,(XATransactionId)xid,this); + xaTransactions.put(xid,transaction); + } + }else{ + Map transactionMap=context.getTransactions(); + Transaction transaction=(Transaction)transactionMap.get(xid); + if(transaction!=null) throw new JMSException("Transaction '"+xid+"' has already been started."); - - transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); - transactionMap.put(xid, transaction); + transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context); + transactionMap.put(xid,transaction); } } @@ -215,24 +219,28 @@ public class TransactionBroker extends BrokerFilter { // Implementation help methods. // ////////////////////////////////////////////////////////////////////////////// - public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { - Map transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions(); - Transaction transaction = (Transaction)transactionMap.get(xid); - - if( transaction != null ) + public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared) + throws JMSException,XAException{ + Map transactionMap=null; + synchronized(xaTransactions){ + transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions(); + } + Transaction transaction=(Transaction)transactionMap.get(xid); + if(transaction!=null) return transaction; - - if( xid.isXATransaction() ) { - XAException e = new XAException("Transaction '" + xid + "' has not been started."); - e.errorCode = XAException.XAER_NOTA; + if(xid.isXATransaction()){ + XAException e=new XAException("Transaction '"+xid+"' has not been started."); + e.errorCode=XAException.XAER_NOTA; throw e; - } else { - throw new JMSException("Transaction '" + xid + "' has not been started."); + }else{ + throw new JMSException("Transaction '"+xid+"' has not been started."); } } - public void removeTransaction(XATransactionId xid) { - xaTransactions.remove(xid); + public void removeTransaction(XATransactionId xid){ + synchronized(xaTransactions){ + xaTransactions.remove(xid); + } } }