further reduction in contention on inflight transacitons in kahaDB store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@963674 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-13 10:54:59 +00:00
parent 584d52c1f0
commit b1d7a78c0a
3 changed files with 23 additions and 46 deletions

View File

@ -956,14 +956,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
KahaTransactionInfo rc = new KahaTransactionInfo();
// Link it up to the previous record that was part of the transaction.
synchronized (inflightTransactions) {
ArrayList<Operation> tx = inflightTransactions.get(txid);
if (tx != null) {
rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
}
}
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();

View File

@ -289,7 +289,7 @@ public class KahaDBTransactionStore implements TransactionStore {
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
// inflightTransactions.clear();
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();

View File

@ -26,17 +26,7 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -302,10 +292,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
checkpointThread.setDaemon(true);
checkpointThread.start();
}
/**
* @throws IOException
*/
public void open() throws IOException {
if( opened.compareAndSet(false, true) ) {
getJournal().start();
@ -731,7 +718,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
* during a recovery process.
* @param done
*/
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
if (before != null) {
@ -860,10 +846,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
}
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
} else {
this.indexLock.writeLock().lock();
try {
@ -880,10 +864,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new RemoveOpperation(command, location));
}
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new RemoveOpperation(command, location));
} else {
this.indexLock.writeLock().lock();
try {
@ -927,7 +909,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(KahaCommitCommand command, Location location) throws IOException {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> inflightTx = null;
List<Operation> inflightTx;
synchronized (inflightTransactions) {
inflightTx = inflightTransactions.remove(key);
if (inflightTx == null) {
@ -938,7 +920,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
return;
}
final ArrayList<Operation> messagingTx = inflightTx;
final List<Operation> messagingTx = inflightTx;
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@ -954,9 +936,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
protected void process(KahaPrepareCommand command, Location location) {
TransactionId key = key(command.getTransactionInfo());
synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
List<Operation> tx = inflightTransactions.remove(key);
if (tx != null) {
preparedTransactions.put(key, tx);
}
@ -964,9 +946,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
protected void process(KahaRollbackCommand command, Location location) {
TransactionId key = key(command.getTransactionInfo());
synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
List<Operation> tx = inflightTransactions.remove(key);
if (tx == null) {
preparedTransactions.remove(key);
}
@ -1496,15 +1478,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// /////////////////////////////////////////////////////////////////
// Transaction related implementation methods.
// /////////////////////////////////////////////////////////////////
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = key(info);
ArrayList<Operation> tx = inflightTransactions.get(key);
if (tx == null) {
tx = new ArrayList<Operation>();
inflightTransactions.put(key, tx);
List<Operation> tx;
synchronized (inflightTransactions) {
tx = inflightTransactions.get(key);
if (tx == null) {
tx = Collections.synchronizedList(new ArrayList<Operation>());
inflightTransactions.put(key, tx);
}
}
return tx;
}