diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 0e4a2c6d9e..9a0b6ebb75 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -29,7 +29,9 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; @@ -443,7 +445,6 @@ public class MessageDatabase { checkpointUpdate(tx, false); } }); - pageFile.flush(); closure.execute(); } } @@ -774,46 +775,82 @@ public class MessageDatabase { pageFile.flush(); if( cleanup ) { - // Find empty journal files to remove. - final HashSet inUseFiles = new HashSet(); + + final TreeSet gcCandidateSet = new TreeSet(journal.getFileMap().keySet()); + + // Don't GC files under replication + if( journalFilesBeingReplicated!=null ) { + gcCandidateSet.removeAll(journalFilesBeingReplicated); + } + + // Don't GC files after the first in progress tx + Location firstTxLocation = metadata.lastUpdate; + if( metadata.firstInProgressTransactionLocation!=null ) { + firstTxLocation = metadata.firstInProgressTransactionLocation; + } + + if( firstTxLocation!=null ) { + while( !gcCandidateSet.isEmpty() ) { + Integer last = gcCandidateSet.last(); + if( last >= firstTxLocation.getDataFileId() ) { + gcCandidateSet.remove(last); + } else { + break; + } + } + } + + // Go through all the destinations to see if any of them can remove GC candidates. for (StoredDestination sd : storedDestinations.values()) { + if( gcCandidateSet.isEmpty() ) { + break; + } // Use a visitor to cut down the number of pages that we load sd.locationIndex.visit(tx, new BTreeVisitor() { int last=-1; public boolean isInterestedInKeysBetween(Location first, Location second) { - if( second!=null ) { - if( last+1 == second.getDataFileId() ) { - last++; - inUseFiles.add(last); - } - if( last == second.getDataFileId() ) { - return false; - } - } - return true; + if( first==null ) { + SortedSet subset = gcCandidateSet.headSet(second.getDataFileId()+1); + if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + subset.remove(second.getDataFileId()); + } + return !subset.isEmpty(); + } else if( second==null ) { + SortedSet subset = gcCandidateSet.tailSet(first.getDataFileId()); + if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + subset.remove(first.getDataFileId()); + } + return !subset.isEmpty(); + } else { + SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); + if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + subset.remove(first.getDataFileId()); + } + if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + subset.remove(second.getDataFileId()); + } + return !subset.isEmpty(); + } } public void visit(List keys, List values) { - for (int i = 0; i < keys.size(); i++) { - if( last != keys.get(i).getDataFileId() ) { - inUseFiles.add(keys.get(i).getDataFileId()); - last = keys.get(i).getDataFileId(); + for (Location l : keys) { + int fileId = l.getDataFileId(); + if( last != fileId ) { + gcCandidateSet.remove(fileId); + last = fileId; } - } - + } } }); } - inUseFiles.addAll(journalFilesBeingReplicated); - Location l = metadata.lastUpdate; - if( metadata.firstInProgressTransactionLocation!=null ) { - l = metadata.firstInProgressTransactionLocation; + + if( !gcCandidateSet.isEmpty() ) { + LOG.debug("Cleanup removing the data files: "+gcCandidateSet); + journal.removeDataFiles(gcCandidateSet); } - - LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l); - journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId()); } LOG.debug("Checkpoint done."); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java index 95ae2598f7..0098e7fb1e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -150,12 +150,12 @@ public class VerifySteadyEnqueueRate extends TestCase { } else { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File("target/activemq-data/kahadb")); - kaha.deleteAllMessages(); - kaha.setCleanupInterval(1000 * 60 * 60 * 60); // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified // what happens if the index is updated but a journal update is lost. // Index is going to be in consistent, but can it be repaired? kaha.setEnableJournalDiskSyncs(false); + // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. + kaha.getJournal().setMaxFileLength(1024*1024*100); kaha.getPageFile().setWriteBatchSize(100); kaha.getPageFile().setEnableWriteThread(true); broker.setPersistenceAdapter(kaha); diff --git a/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java b/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java index 0635d1fb06..f9fe25ce16 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java @@ -524,7 +524,7 @@ public final class BTreeNode { } Key key2 = null; if( i!=this.children.length-1 ) { - key1 = keys[i]; + key2 = keys[i]; } if( visitor.isInterestedInKeysBetween(key1, key2) ) { BTreeNode child = getChild(tx, i); diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java index df39470c53..3d4c312a57 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java @@ -16,17 +16,12 @@ */ package org.apache.kahadb.journal; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -42,7 +37,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.kahadb.journal.DataFileAppender.WriteCommand; import org.apache.kahadb.journal.DataFileAppender.WriteKey; import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.IOHelper; import org.apache.kahadb.util.LinkedNodeList; import org.apache.kahadb.util.Scheduler; @@ -299,18 +293,10 @@ public class Journal { return result; } - public synchronized void consolidateDataFilesNotIn(Set inUse, Integer lastFile) throws IOException { - Set unUsed = new HashSet(fileMap.keySet()); - unUsed.removeAll(inUse); - - for (Integer key : unUsed) { - // Don't remove files that come after the lastFile - if (lastFile !=null && key >= lastFile ) { - continue; - } + public synchronized void removeDataFiles(Set files) throws IOException { + for (Integer key : files) { DataFile dataFile = fileMap.get(key); - - // Can't remove the last file either. + // Can't remove the last file. if( dataFile == dataFiles.getTail() ) { continue; }