mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@818158 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4037e9db9c
commit
9f6d8d21c7
|
@ -349,4 +349,19 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter {
|
|||
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
|
||||
}
|
||||
|
||||
public boolean isChecksumJournalFiles() {
|
||||
return letter.isChecksumJournalFiles();
|
||||
}
|
||||
|
||||
public boolean isCheckForCorruptJournalFiles() {
|
||||
return letter.isCheckForCorruptJournalFiles();
|
||||
}
|
||||
|
||||
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
|
||||
letter.setChecksumJournalFiles(checksumJournalFiles);
|
||||
}
|
||||
|
||||
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
|
||||
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,16 +22,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -60,6 +51,7 @@ import org.apache.kahadb.index.BTreeIndex;
|
|||
import org.apache.kahadb.index.BTreeVisitor;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
import org.apache.kahadb.journal.Location;
|
||||
import org.apache.kahadb.journal.DataFile;
|
||||
import org.apache.kahadb.page.Page;
|
||||
import org.apache.kahadb.page.PageFile;
|
||||
import org.apache.kahadb.page.Transaction;
|
||||
|
@ -151,6 +143,8 @@ public class MessageDatabase {
|
|||
private LockFile lockFile;
|
||||
private boolean ignoreMissingJournalfiles = false;
|
||||
private int indexCacheSize = 100;
|
||||
private boolean checkForCorruptJournalFiles = false;
|
||||
private boolean checksumJournalFiles = false;
|
||||
|
||||
public MessageDatabase() {
|
||||
}
|
||||
|
@ -406,6 +400,15 @@ public class MessageDatabase {
|
|||
}
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
if( undoCounter > 0 ) {
|
||||
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
||||
// should do sync writes to the journal.
|
||||
LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
|
||||
undoCounter = 0;
|
||||
start = System.currentTimeMillis();
|
||||
|
||||
// Lets be extra paranoid here and verify that all the datafiles being referenced
|
||||
// by the indexes still exists.
|
||||
|
@ -445,21 +448,44 @@ public class MessageDatabase {
|
|||
missingJournalFiles.removeAll( journal.getFileMap().keySet() );
|
||||
|
||||
if( !missingJournalFiles.isEmpty() ) {
|
||||
if( ignoreMissingJournalfiles ) {
|
||||
LOG.info("Some journal files are missing: "+missingJournalFiles);
|
||||
}
|
||||
|
||||
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
|
||||
for (Integer missing : missingJournalFiles) {
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
|
||||
}
|
||||
|
||||
if ( checkForCorruptJournalFiles ) {
|
||||
Collection<DataFile> dataFiles = journal.getFileMap().values();
|
||||
for (DataFile dataFile : dataFiles) {
|
||||
int id = dataFile.getDataFileId();
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
|
||||
Sequence seq = dataFile.getCorruptedBlocks().getHead();
|
||||
while( seq!=null ) {
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
|
||||
seq = seq.getNext();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( !missingPredicates.isEmpty() ) {
|
||||
for (StoredDestination sd : storedDestinations.values()) {
|
||||
|
||||
final ArrayList<Long> matches = new ArrayList<Long>();
|
||||
for (Integer missing : missingJournalFiles) {
|
||||
sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)) {
|
||||
@Override
|
||||
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
|
||||
protected void matched(Location key, Long value) {
|
||||
matches.add(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// If somes message references are affected by the missing data files...
|
||||
if( !matches.isEmpty() ) {
|
||||
|
||||
// We either 'gracefully' recover dropping the missing messages or
|
||||
// we error out.
|
||||
if( ignoreMissingJournalfiles ) {
|
||||
// Update the index to remove the references to the missing data
|
||||
for (Long sequenceId : matches) {
|
||||
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
|
||||
sd.locationIndex.remove(tx, keys.location);
|
||||
|
@ -467,18 +493,19 @@ public class MessageDatabase {
|
|||
undoCounter++;
|
||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new IOException("Detected missing journal files: "+missingJournalFiles);
|
||||
throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
end = System.currentTimeMillis();
|
||||
if( undoCounter > 0 ) {
|
||||
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
||||
// should do sync writes to the journal.
|
||||
LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1339,6 +1366,8 @@ public class MessageDatabase {
|
|||
Journal manager = new Journal();
|
||||
manager.setDirectory(directory);
|
||||
manager.setMaxFileLength(getJournalMaxFileLength());
|
||||
manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
|
||||
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
|
||||
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
|
||||
return manager;
|
||||
}
|
||||
|
@ -1452,4 +1481,20 @@ public class MessageDatabase {
|
|||
public void setIndexCacheSize(int indexCacheSize) {
|
||||
this.indexCacheSize = indexCacheSize;
|
||||
}
|
||||
|
||||
public boolean isCheckForCorruptJournalFiles() {
|
||||
return checkForCorruptJournalFiles;
|
||||
}
|
||||
|
||||
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
|
||||
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
|
||||
}
|
||||
|
||||
public boolean isChecksumJournalFiles() {
|
||||
return checksumJournalFiles;
|
||||
}
|
||||
|
||||
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
|
||||
this.checksumJournalFiles = checksumJournalFiles;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue