mirror of https://github.com/apache/activemq.git
Changed the cleanup algorithm used in the KahaDB.. It should now be much faster.
Fixed bug in the BTreeNode.visit() which was making us load most of the pages instead of letting us filter down to just a few. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741212 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c0594251e2
commit
8262ef7903
|
@ -29,7 +29,9 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.SortedSet;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -443,7 +445,6 @@ public class MessageDatabase {
|
||||||
checkpointUpdate(tx, false);
|
checkpointUpdate(tx, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
pageFile.flush();
|
|
||||||
closure.execute();
|
closure.execute();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -774,46 +775,82 @@ public class MessageDatabase {
|
||||||
pageFile.flush();
|
pageFile.flush();
|
||||||
|
|
||||||
if( cleanup ) {
|
if( cleanup ) {
|
||||||
// Find empty journal files to remove.
|
|
||||||
final HashSet<Integer> inUseFiles = new HashSet<Integer>();
|
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
|
||||||
|
|
||||||
|
// Don't GC files under replication
|
||||||
|
if( journalFilesBeingReplicated!=null ) {
|
||||||
|
gcCandidateSet.removeAll(journalFilesBeingReplicated);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't GC files after the first in progress tx
|
||||||
|
Location firstTxLocation = metadata.lastUpdate;
|
||||||
|
if( metadata.firstInProgressTransactionLocation!=null ) {
|
||||||
|
firstTxLocation = metadata.firstInProgressTransactionLocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( firstTxLocation!=null ) {
|
||||||
|
while( !gcCandidateSet.isEmpty() ) {
|
||||||
|
Integer last = gcCandidateSet.last();
|
||||||
|
if( last >= firstTxLocation.getDataFileId() ) {
|
||||||
|
gcCandidateSet.remove(last);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go through all the destinations to see if any of them can remove GC candidates.
|
||||||
for (StoredDestination sd : storedDestinations.values()) {
|
for (StoredDestination sd : storedDestinations.values()) {
|
||||||
|
if( gcCandidateSet.isEmpty() ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Use a visitor to cut down the number of pages that we load
|
// Use a visitor to cut down the number of pages that we load
|
||||||
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
|
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
|
||||||
int last=-1;
|
int last=-1;
|
||||||
public boolean isInterestedInKeysBetween(Location first, Location second) {
|
public boolean isInterestedInKeysBetween(Location first, Location second) {
|
||||||
if( second!=null ) {
|
if( first==null ) {
|
||||||
if( last+1 == second.getDataFileId() ) {
|
SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
|
||||||
last++;
|
if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
|
||||||
inUseFiles.add(last);
|
subset.remove(second.getDataFileId());
|
||||||
}
|
}
|
||||||
if( last == second.getDataFileId() ) {
|
return !subset.isEmpty();
|
||||||
return false;
|
} else if( second==null ) {
|
||||||
}
|
SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
|
||||||
}
|
if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
|
||||||
return true;
|
subset.remove(first.getDataFileId());
|
||||||
|
}
|
||||||
|
return !subset.isEmpty();
|
||||||
|
} else {
|
||||||
|
SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
|
||||||
|
if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
|
||||||
|
subset.remove(first.getDataFileId());
|
||||||
|
}
|
||||||
|
if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
|
||||||
|
subset.remove(second.getDataFileId());
|
||||||
|
}
|
||||||
|
return !subset.isEmpty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visit(List<Location> keys, List<Long> values) {
|
public void visit(List<Location> keys, List<Long> values) {
|
||||||
for (int i = 0; i < keys.size(); i++) {
|
for (Location l : keys) {
|
||||||
if( last != keys.get(i).getDataFileId() ) {
|
int fileId = l.getDataFileId();
|
||||||
inUseFiles.add(keys.get(i).getDataFileId());
|
if( last != fileId ) {
|
||||||
last = keys.get(i).getDataFileId();
|
gcCandidateSet.remove(fileId);
|
||||||
|
last = fileId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
inUseFiles.addAll(journalFilesBeingReplicated);
|
|
||||||
Location l = metadata.lastUpdate;
|
if( !gcCandidateSet.isEmpty() ) {
|
||||||
if( metadata.firstInProgressTransactionLocation!=null ) {
|
LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
|
||||||
l = metadata.firstInProgressTransactionLocation;
|
journal.removeDataFiles(gcCandidateSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
|
|
||||||
journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Checkpoint done.");
|
LOG.debug("Checkpoint done.");
|
||||||
|
|
|
@ -150,12 +150,12 @@ public class VerifySteadyEnqueueRate extends TestCase {
|
||||||
} else {
|
} else {
|
||||||
KahaDBStore kaha = new KahaDBStore();
|
KahaDBStore kaha = new KahaDBStore();
|
||||||
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
||||||
kaha.deleteAllMessages();
|
|
||||||
kaha.setCleanupInterval(1000 * 60 * 60 * 60);
|
|
||||||
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
|
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
|
||||||
// what happens if the index is updated but a journal update is lost.
|
// what happens if the index is updated but a journal update is lost.
|
||||||
// Index is going to be in consistent, but can it be repaired?
|
// Index is going to be in consistent, but can it be repaired?
|
||||||
kaha.setEnableJournalDiskSyncs(false);
|
kaha.setEnableJournalDiskSyncs(false);
|
||||||
|
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
|
||||||
|
kaha.getJournal().setMaxFileLength(1024*1024*100);
|
||||||
kaha.getPageFile().setWriteBatchSize(100);
|
kaha.getPageFile().setWriteBatchSize(100);
|
||||||
kaha.getPageFile().setEnableWriteThread(true);
|
kaha.getPageFile().setEnableWriteThread(true);
|
||||||
broker.setPersistenceAdapter(kaha);
|
broker.setPersistenceAdapter(kaha);
|
||||||
|
|
|
@ -524,7 +524,7 @@ public final class BTreeNode<Key,Value> {
|
||||||
}
|
}
|
||||||
Key key2 = null;
|
Key key2 = null;
|
||||||
if( i!=this.children.length-1 ) {
|
if( i!=this.children.length-1 ) {
|
||||||
key1 = keys[i];
|
key2 = keys[i];
|
||||||
}
|
}
|
||||||
if( visitor.isInterestedInKeysBetween(key1, key2) ) {
|
if( visitor.isInterestedInKeysBetween(key1, key2) ) {
|
||||||
BTreeNode<Key, Value> child = getChild(tx, i);
|
BTreeNode<Key, Value> child = getChild(tx, i);
|
||||||
|
|
|
@ -16,17 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kahadb.journal;
|
package org.apache.kahadb.journal;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -42,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
||||||
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
||||||
import org.apache.kahadb.util.ByteSequence;
|
import org.apache.kahadb.util.ByteSequence;
|
||||||
import org.apache.kahadb.util.IOHelper;
|
|
||||||
import org.apache.kahadb.util.LinkedNodeList;
|
import org.apache.kahadb.util.LinkedNodeList;
|
||||||
import org.apache.kahadb.util.Scheduler;
|
import org.apache.kahadb.util.Scheduler;
|
||||||
|
|
||||||
|
@ -299,18 +293,10 @@ public class Journal {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
|
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
|
||||||
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
|
for (Integer key : files) {
|
||||||
unUsed.removeAll(inUse);
|
|
||||||
|
|
||||||
for (Integer key : unUsed) {
|
|
||||||
// Don't remove files that come after the lastFile
|
|
||||||
if (lastFile !=null && key >= lastFile ) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
DataFile dataFile = fileMap.get(key);
|
DataFile dataFile = fileMap.get(key);
|
||||||
|
// Can't remove the last file.
|
||||||
// Can't remove the last file either.
|
|
||||||
if( dataFile == dataFiles.getTail() ) {
|
if( dataFile == dataFiles.getTail() ) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue