AMQ-2405: Adding more robust/graceful disk corruption recovery logic.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818155 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-09-23 16:16:39 +00:00
parent 785454a366
commit c9500f45ef
2 changed files with 95 additions and 35 deletions

View File

@ -349,4 +349,19 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
}
public boolean isChecksumJournalFiles() {
return letter.isChecksumJournalFiles();
}
public boolean isCheckForCorruptJournalFiles() {
return letter.isCheckForCorruptJournalFiles();
}
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
letter.setChecksumJournalFiles(checksumJournalFiles);
}
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
}

View File

@ -22,16 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@ -60,6 +51,7 @@ import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
@ -151,6 +143,8 @@ public class MessageDatabase {
private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 100;
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
public MessageDatabase() {
}
@ -406,6 +400,15 @@ public class MessageDatabase {
}
}
long end = System.currentTimeMillis();
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
undoCounter = 0;
start = System.currentTimeMillis();
// Lets be extra paranoid here and verify that all the datafiles being referenced
// by the indexes still exists.
@ -445,40 +448,64 @@ public class MessageDatabase {
missingJournalFiles.removeAll( journal.getFileMap().keySet() );
if( !missingJournalFiles.isEmpty() ) {
if( ignoreMissingJournalfiles ) {
LOG.info("Some journal files are missing: "+missingJournalFiles);
}
for (StoredDestination sd : storedDestinations.values()) {
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
for (Integer missing : missingJournalFiles) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
}
final ArrayList<Long> matches = new ArrayList<Long>();
for (Integer missing : missingJournalFiles) {
sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)) {
@Override
protected void matched(Location key, Long value) {
matches.add(value);
}
});
}
for (Long sequenceId : matches) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
if ( checkForCorruptJournalFiles ) {
Collection<DataFile> dataFiles = journal.getFileMap().values();
for (DataFile dataFile : dataFiles) {
int id = dataFile.getDataFileId();
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
Sequence seq = dataFile.getCorruptedBlocks().getHead();
while( seq!=null ) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
seq = seq.getNext();
}
} else {
throw new IOException("Detected missing journal files: "+missingJournalFiles);
}
}
long end = System.currentTimeMillis();
if( !missingPredicates.isEmpty() ) {
for (StoredDestination sd : storedDestinations.values()) {
final ArrayList<Long> matches = new ArrayList<Long>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
protected void matched(Location key, Long value) {
matches.add(value);
}
});
// If somes message references are affected by the missing data files...
if( !matches.isEmpty() ) {
// We either 'gracefully' recover dropping the missing messages or
// we error out.
if( ignoreMissingJournalfiles ) {
// Update the index to remove the references to the missing data
for (Long sequenceId : matches) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
}
}
}
}
end = System.currentTimeMillis();
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
@ -1339,6 +1366,8 @@ public class MessageDatabase {
Journal manager = new Journal();
manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength());
manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
return manager;
}
@ -1452,4 +1481,20 @@ public class MessageDatabase {
public void setIndexCacheSize(int indexCacheSize) {
this.indexCacheSize = indexCacheSize;
}
public boolean isCheckForCorruptJournalFiles() {
return checkForCorruptJournalFiles;
}
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
}
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles;
}
}