diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index aafc14fe3d..7d6b735c5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -349,4 +349,19 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter { this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); } + public boolean isChecksumJournalFiles() { + return letter.isChecksumJournalFiles(); + } + + public boolean isCheckForCorruptJournalFiles() { + return letter.isCheckForCorruptJournalFiles(); + } + + public void setChecksumJournalFiles(boolean checksumJournalFiles) { + letter.setChecksumJournalFiles(checksumJournalFiles); + } + + public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { + letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index e53802b6fc..266a8e5e33 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -22,16 +22,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,6 +51,7 @@ import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Location; +import org.apache.kahadb.journal.DataFile; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; @@ -151,6 +143,8 @@ public class MessageDatabase { private LockFile lockFile; private boolean ignoreMissingJournalfiles = false; private int indexCacheSize = 100; + private boolean checkForCorruptJournalFiles = false; + private boolean checksumJournalFiles = false; public MessageDatabase() { } @@ -406,6 +400,15 @@ public class MessageDatabase { } } + long end = System.currentTimeMillis(); + if( undoCounter > 0 ) { + // The rolledback operations are basically in flight journal writes. To avoid getting these the end user + // should do sync writes to the journal. + LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + } + + undoCounter = 0; + start = System.currentTimeMillis(); // Lets be extra paranoid here and verify that all the datafiles being referenced // by the indexes still exists. @@ -445,40 +448,64 @@ public class MessageDatabase { missingJournalFiles.removeAll( journal.getFileMap().keySet() ); if( !missingJournalFiles.isEmpty() ) { - if( ignoreMissingJournalfiles ) { + LOG.info("Some journal files are missing: "+missingJournalFiles); + } - for (StoredDestination sd : storedDestinations.values()) { + ArrayList> missingPredicates = new ArrayList>(); + for (Integer missing : missingJournalFiles) { + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0))); + } - final ArrayList matches = new ArrayList(); - for (Integer missing : missingJournalFiles) { - sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0)) { - @Override - protected void matched(Location key, Long value) { - matches.add(value); - } - }); - } - - - for (Long sequenceId : matches) { - MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); - sd.locationIndex.remove(tx, keys.location); - sd.messageIdIndex.remove(tx, keys.messageId); - undoCounter++; - // TODO: do we need to modify the ack positions for the pub sub case? - } + if ( checkForCorruptJournalFiles ) { + Collection dataFiles = journal.getFileMap().values(); + for (DataFile dataFile : dataFiles) { + int id = dataFile.getDataFileId(); + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id,dataFile.getLength()), new Location(id+1,0))); + Sequence seq = dataFile.getCorruptedBlocks().getHead(); + while( seq!=null ) { + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1))); + seq = seq.getNext(); } - - } else { - throw new IOException("Detected missing journal files: "+missingJournalFiles); } } - long end = System.currentTimeMillis(); + if( !missingPredicates.isEmpty() ) { + for (StoredDestination sd : storedDestinations.values()) { + + final ArrayList matches = new ArrayList(); + sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { + protected void matched(Location key, Long value) { + matches.add(value); + } + }); + + // If somes message references are affected by the missing data files... + if( !matches.isEmpty() ) { + + // We either 'gracefully' recover dropping the missing messages or + // we error out. + if( ignoreMissingJournalfiles ) { + // Update the index to remove the references to the missing data + for (Long sequenceId : matches) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + sd.messageIdIndex.remove(tx, keys.messageId); + undoCounter++; + // TODO: do we need to modify the ack positions for the pub sub case? + } + + } else { + throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); + } + } + } + } + + end = System.currentTimeMillis(); if( undoCounter > 0 ) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user // should do sync writes to the journal. - LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); } } @@ -1339,6 +1366,8 @@ public class MessageDatabase { Journal manager = new Journal(); manager.setDirectory(directory); manager.setMaxFileLength(getJournalMaxFileLength()); + manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); + manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); return manager; } @@ -1452,4 +1481,20 @@ public class MessageDatabase { public void setIndexCacheSize(int indexCacheSize) { this.indexCacheSize = indexCacheSize; } + + public boolean isCheckForCorruptJournalFiles() { + return checkForCorruptJournalFiles; + } + + public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { + this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; + } + + public boolean isChecksumJournalFiles() { + return checksumJournalFiles; + } + + public void setChecksumJournalFiles(boolean checksumJournalFiles) { + this.checksumJournalFiles = checksumJournalFiles; + } }