Improve concurrency

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@961484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-07-07 19:34:09 +00:00
parent 06b41bb759
commit b47da808d7
2 changed files with 112 additions and 29 deletions

View File

@ -389,7 +389,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// operations... but for now we must
// externally synchronize...
Location location;
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@ -400,6 +401,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.orderIndex.get(tx, sequence).location;
}
});
}finally {
indexLock.readLock().unlock();
}
if (location == null) {
return null;
@ -411,7 +414,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public int getMessageCount() throws IOException {
try {
lockAsyncJobQueue();
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
@ -427,6 +431,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return rc;
}
});
}finally {
indexLock.readLock().unlock();
}
} finally {
unlockAsyncJobQueue();
@ -435,7 +441,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public boolean isEmpty() throws IOException {
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
public Boolean execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of
@ -444,11 +451,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.locationIndex.isEmpty(tx);
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@ -459,13 +469,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
}finally {
indexLock.readLock().unlock();
}
}
long cursorPos = 0;
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@ -486,6 +499,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
}finally {
indexLock.readLock().unlock();
}
}
@ -503,13 +518,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// operations... but for now we must
// externally synchronize...
Long location;
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
}
});
}finally {
indexLock.readLock().unlock();
}
if (location != null) {
cursorPos = location + 1;
@ -638,7 +656,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@ -652,6 +671,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
}finally {
indexLock.readLock().unlock();
}
SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
@ -661,7 +682,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@ -673,13 +695,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
.getSubscriptionInfo().newInput()));
}
});
}finally {
indexLock.readLock().unlock();
}
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@ -716,13 +741,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return counter;
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@ -736,13 +764,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@ -768,19 +799,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized (indexMutex) {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
}
});
}finally {
indexLock.writeLock().unlock();
}
} catch (IOException e) {
throw new RuntimeException(e);
@ -827,7 +863,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
synchronized (indexMutex) {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
@ -852,6 +889,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return isEmptyTopic;
}
});
}finally {
indexLock.readLock().unlock();
}
return rc;
} catch (IOException e) {

View File

@ -35,6 +35,9 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
@ -193,7 +196,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
private void loadPageFile() throws IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
final PageFile pageFile = getPageFile();
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@ -232,6 +236,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -307,7 +313,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public void load() throws IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
lock();
if (deleteAllMessages) {
getJournal().start();
@ -321,7 +328,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
open();
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -329,7 +337,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, true);
@ -337,6 +346,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
});
pageFile.unload();
metadata = new Metadata();
}finally {
this.indexLock.writeLock().unlock();
}
journal.close();
checkpointThread.join();
@ -346,7 +357,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
public void unload() throws IOException, InterruptedException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
if( pageFile != null && pageFile.isLoaded() ) {
metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
@ -357,6 +369,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
});
}
}finally {
this.indexLock.writeLock().unlock();
}
close();
}
@ -389,7 +403,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
@ -413,6 +428,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
recoverIndex(tx);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -559,7 +576,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private Location lastRecoveryPosition;
public void incrementalRecover() throws IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
if( nextRecoveryPosition == null ) {
if( lastRecoveryPosition==null ) {
nextRecoveryPosition = getRecoveryPosition();
@ -574,6 +592,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
process(message, lastRecoveryPosition);
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -600,7 +620,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
start = System.currentTimeMillis();
if( !opened.get() ) {
return;
@ -610,6 +631,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
checkpointUpdate(tx, cleanup);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
@ -619,13 +642,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public void checkpoint(Callback closure) throws Exception {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, false);
}
});
closure.execute();
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -662,8 +688,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
metadata.lastUpdate = location;
}finally {
this.indexLock.writeLock().unlock();
}
if (!checkpointThread.isAlive()) {
LOG.info("KahaDB: Recovering checkpoint thread after exception");
@ -752,12 +781,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
inflightTx.add(new AddOpperation(command, location));
}
} else {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
}
@ -769,34 +801,43 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
inflightTx.add(new RemoveOpperation(command, location));
}
} else {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
}
protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -814,7 +855,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
final ArrayList<Operation> messagingTx = inflightTx;
synchronized (indexMutex) {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
@ -822,6 +864,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
});
}finally {
this.indexLock.writeLock().unlock();
}
}
@ -849,7 +893,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// These methods do the actual index updates.
// /////////////////////////////////////////////////////////////////
protected final Object indexMutex = new Object();
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {