mirror of https://github.com/apache/activemq.git
- Journal will now check the data files for data corruption if checkForCorruptionOnStartup option is enabled.
- The corrupted journal records can be inspected via DataFile.getCorruptedBlocks() - An OR and AND BTreeVisitor is now supported to support running more complex queries against the BTree git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@805550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a95b7e309
commit
76d3b4665c
|
@ -43,31 +43,110 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
*/
|
*/
|
||||||
void visit(List<Key> keys, List<Value> values);
|
void visit(List<Key> keys, List<Value> values);
|
||||||
|
|
||||||
|
public interface Predicate<Key> {
|
||||||
abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
|
boolean isInterestedInKeysBetween(Key first, Key second);
|
||||||
final private Key value;
|
boolean isInterestedInKey(Key key);
|
||||||
|
|
||||||
public GTVisitor(Key value) {
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isInterestedInKeysBetween(Key first, Key second) {
|
|
||||||
return second==null || second.compareTo(value)>0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract class PredicateVisitor<Key, Value> implements BTreeVisitor<Key, Value>, Predicate<Key> {
|
||||||
public void visit(List<Key> keys, List<Value> values) {
|
public void visit(List<Key> keys, List<Value> values) {
|
||||||
for( int i=0; i < keys.size(); i++) {
|
for( int i=0; i < keys.size(); i++) {
|
||||||
Key key = keys.get(i);
|
Key key = keys.get(i);
|
||||||
if( key.compareTo(value)>0 ) {
|
if( isInterestedInKey(key) ) {
|
||||||
matched(key, values.get(i));
|
matched(key, values.get(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected void matched(Key key, Value value);
|
protected void matched(Key key, Value value) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class BetweenVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
|
class OrVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
|
||||||
|
private final List<Predicate<Key>> conditions;
|
||||||
|
|
||||||
|
public OrVisitor(List<Predicate<Key>> conditions) {
|
||||||
|
this.conditions = conditions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInterestedInKeysBetween(Key first, Key second) {
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( condition.isInterestedInKeysBetween(first, second) ) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInterestedInKey(Key key) {
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( condition.isInterestedInKey(key) ) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
boolean first=true;
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( !first ) {
|
||||||
|
sb.append(" OR ");
|
||||||
|
}
|
||||||
|
first=false;
|
||||||
|
sb.append("(");
|
||||||
|
sb.append(condition);
|
||||||
|
sb.append(")");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class AndVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
|
||||||
|
private final List<Predicate<Key>> conditions;
|
||||||
|
|
||||||
|
public AndVisitor(List<Predicate<Key>> conditions) {
|
||||||
|
this.conditions = conditions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInterestedInKeysBetween(Key first, Key second) {
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( !condition.isInterestedInKeysBetween(first, second) ) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInterestedInKey(Key key) {
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( !condition.isInterestedInKey(key) ) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
boolean first=true;
|
||||||
|
for (Predicate<Key> condition : conditions) {
|
||||||
|
if( !first ) {
|
||||||
|
sb.append(" AND ");
|
||||||
|
}
|
||||||
|
first=false;
|
||||||
|
sb.append("(");
|
||||||
|
sb.append(condition);
|
||||||
|
sb.append(")");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class BetweenVisitor<Key extends Comparable<Key>, Value> extends PredicateVisitor<Key, Value> {
|
||||||
private final Key first;
|
private final Key first;
|
||||||
private final Key last;
|
private final Key last;
|
||||||
|
|
||||||
|
@ -81,19 +160,38 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
&& (first==null || first.compareTo(last)<0);
|
&& (first==null || first.compareTo(last)<0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visit(List<Key> keys, List<Value> values) {
|
public boolean isInterestedInKey(Key key) {
|
||||||
for( int i=0; i < keys.size(); i++) {
|
return key.compareTo(first) >=0 && key.compareTo(last) <0;
|
||||||
Key key = keys.get(i);
|
|
||||||
if( key.compareTo(first)>=0 && key.compareTo(last)<0 ) {
|
|
||||||
matched(key, values.get(i));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return first+" <= key < "+last;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected void matched(Key key, Value value);
|
class GTVisitor<Key extends Comparable<Key>, Value> extends PredicateVisitor<Key, Value> {
|
||||||
|
final private Key value;
|
||||||
|
|
||||||
|
public GTVisitor(Key value) {
|
||||||
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
|
public boolean isInterestedInKeysBetween(Key first, Key second) {
|
||||||
|
return second==null || second.compareTo(value)>0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInterestedInKey(Key key) {
|
||||||
|
return key.compareTo(value)>0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "key > "+ value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class GTEVisitor<Key extends Comparable<Key>, Value> extends PredicateVisitor<Key, Value> {
|
||||||
final private Key value;
|
final private Key value;
|
||||||
|
|
||||||
public GTEVisitor(Key value) {
|
public GTEVisitor(Key value) {
|
||||||
|
@ -104,19 +202,17 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
return second==null || second.compareTo(value)>=0;
|
return second==null || second.compareTo(value)>=0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visit(List<Key> keys, List<Value> values) {
|
public boolean isInterestedInKey(Key key) {
|
||||||
for( int i=0; i < keys.size(); i++) {
|
return key.compareTo(value)>=0;
|
||||||
Key key = keys.get(i);
|
|
||||||
if( key.compareTo(value)>=0 ) {
|
|
||||||
matched(key, values.get(i));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "key >= "+ value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected void matched(Key key, Value value);
|
class LTVisitor<Key extends Comparable<Key>, Value> extends PredicateVisitor<Key, Value> {
|
||||||
}
|
|
||||||
|
|
||||||
abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
|
|
||||||
final private Key value;
|
final private Key value;
|
||||||
|
|
||||||
public LTVisitor(Key value) {
|
public LTVisitor(Key value) {
|
||||||
|
@ -127,19 +223,17 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
return first==null || first.compareTo(value)<0;
|
return first==null || first.compareTo(value)<0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visit(List<Key> keys, List<Value> values) {
|
public boolean isInterestedInKey(Key key) {
|
||||||
for( int i=0; i < keys.size(); i++) {
|
return key.compareTo(value)<0;
|
||||||
Key key = keys.get(i);
|
|
||||||
if( key.compareTo(value)<0 ) {
|
|
||||||
matched(key, values.get(i));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "key < "+ value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected void matched(Key key, Value value);
|
class LTEVisitor<Key extends Comparable<Key>, Value> extends PredicateVisitor<Key, Value> {
|
||||||
}
|
|
||||||
|
|
||||||
abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
|
|
||||||
final private Key value;
|
final private Key value;
|
||||||
|
|
||||||
public LTEVisitor(Key value) {
|
public LTEVisitor(Key value) {
|
||||||
|
@ -150,15 +244,13 @@ public interface BTreeVisitor<Key,Value> {
|
||||||
return first==null || first.compareTo(value)<=0;
|
return first==null || first.compareTo(value)<=0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void visit(List<Key> keys, List<Value> values) {
|
public boolean isInterestedInKey(Key key) {
|
||||||
for( int i=0; i < keys.size(); i++) {
|
return key.compareTo(value)<=0;
|
||||||
Key key = keys.get(i);
|
|
||||||
if( key.compareTo(value)<=0 ) {
|
|
||||||
matched(key, values.get(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected void matched(Key key, Value value);
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "key <= "+ value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
|
||||||
|
|
||||||
import org.apache.kahadb.util.IOHelper;
|
import org.apache.kahadb.util.IOHelper;
|
||||||
import org.apache.kahadb.util.LinkedNode;
|
import org.apache.kahadb.util.LinkedNode;
|
||||||
|
import org.apache.kahadb.util.SequenceSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DataFile
|
* DataFile
|
||||||
|
@ -33,6 +34,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
|
||||||
protected final File file;
|
protected final File file;
|
||||||
protected final Integer dataFileId;
|
protected final Integer dataFileId;
|
||||||
protected int length;
|
protected int length;
|
||||||
|
protected final SequenceSet corruptedBlocks = new SequenceSet();
|
||||||
|
|
||||||
DataFile(File file, int number, int preferedSize) {
|
DataFile(File file, int number, int preferedSize) {
|
||||||
this.file = file;
|
this.file = file;
|
||||||
|
@ -80,6 +82,10 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
|
||||||
IOHelper.moveFile(file,targetDirectory);
|
IOHelper.moveFile(file,targetDirectory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SequenceSet getCorruptedBlocks() {
|
||||||
|
return corruptedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
public int compareTo(DataFile df) {
|
public int compareTo(DataFile df) {
|
||||||
return dataFileId - df.dataFileId;
|
return dataFileId - df.dataFileId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,11 +95,16 @@ final class DataFileAccessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void read(long offset, byte data[]) throws IOException {
|
public void readFully(long offset, byte data[]) throws IOException {
|
||||||
file.seek(offset);
|
file.seek(offset);
|
||||||
file.readFully(data);
|
file.readFully(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int read(long offset, byte data[]) throws IOException {
|
||||||
|
file.seek(offset);
|
||||||
|
return file.read(data);
|
||||||
|
}
|
||||||
|
|
||||||
public void readLocationDetails(Location location) throws IOException {
|
public void readLocationDetails(Location location) throws IOException {
|
||||||
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
|
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
|
||||||
if (asyncWrite != null) {
|
if (asyncWrite != null) {
|
||||||
|
|
|
@ -20,15 +20,7 @@ import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -39,10 +31,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
||||||
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
||||||
import org.apache.kahadb.util.ByteSequence;
|
import org.apache.kahadb.util.*;
|
||||||
import org.apache.kahadb.util.DataByteArrayInputStream;
|
|
||||||
import org.apache.kahadb.util.LinkedNodeList;
|
|
||||||
import org.apache.kahadb.util.Scheduler;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages DataFiles
|
* Manages DataFiles
|
||||||
|
@ -61,6 +50,21 @@ public class Journal {
|
||||||
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
|
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
|
||||||
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
|
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
|
||||||
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
|
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
|
||||||
|
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
|
||||||
|
|
||||||
|
private static byte[] createBatchControlRecordHeader() {
|
||||||
|
try {
|
||||||
|
DataByteArrayOutputStream os = new DataByteArrayOutputStream();
|
||||||
|
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
|
||||||
|
os.writeByte(BATCH_CONTROL_RECORD_TYPE);
|
||||||
|
os.write(BATCH_CONTROL_RECORD_MAGIC);
|
||||||
|
ByteSequence sequence = os.toByteSequence();
|
||||||
|
sequence.compact();
|
||||||
|
return sequence.getData();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Could not create batch control record header.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static final String DEFAULT_DIRECTORY = ".";
|
public static final String DEFAULT_DIRECTORY = ".";
|
||||||
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
|
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
|
||||||
|
@ -96,6 +100,7 @@ public class Journal {
|
||||||
protected boolean archiveDataLogs;
|
protected boolean archiveDataLogs;
|
||||||
private ReplicationTarget replicationTarget;
|
private ReplicationTarget replicationTarget;
|
||||||
protected boolean checksum;
|
protected boolean checksum;
|
||||||
|
protected boolean checkForCorruptionOnStartup;
|
||||||
|
|
||||||
public synchronized void start() throws IOException {
|
public synchronized void start() throws IOException {
|
||||||
if (started) {
|
if (started) {
|
||||||
|
@ -137,15 +142,18 @@ public class Journal {
|
||||||
for (DataFile df : l) {
|
for (DataFile df : l) {
|
||||||
dataFiles.addLast(df);
|
dataFiles.addLast(df);
|
||||||
fileByFileMap.put(df.getFile(), df);
|
fileByFileMap.put(df.getFile(), df);
|
||||||
|
|
||||||
|
if( isCheckForCorruptionOnStartup() ) {
|
||||||
|
lastAppendLocation.set(recoveryCheck(df));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getCurrentWriteFile();
|
getCurrentWriteFile();
|
||||||
try {
|
|
||||||
Location l = recoveryCheck(dataFiles.getTail());
|
if( lastAppendLocation.get()==null ) {
|
||||||
lastAppendLocation.set(l);
|
DataFile df = dataFiles.getTail();
|
||||||
} catch (IOException e) {
|
lastAppendLocation.set(recoveryCheck(df));
|
||||||
LOG.warn("recovery check failed", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupTask = new Runnable() {
|
cleanupTask = new Runnable() {
|
||||||
|
@ -177,45 +185,23 @@ public class Journal {
|
||||||
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
|
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
|
||||||
try {
|
try {
|
||||||
while( true ) {
|
while( true ) {
|
||||||
reader.read(location.getOffset(), controlRecord);
|
int size = checkBatchRecord(reader, location.getOffset());
|
||||||
controlIs.restart();
|
if ( size>=0 ) {
|
||||||
|
|
||||||
// Assert that it's a batch record.
|
|
||||||
if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
|
|
||||||
if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int size = controlIs.readInt();
|
|
||||||
if( size > MAX_BATCH_SIZE ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if( isChecksum() ) {
|
|
||||||
|
|
||||||
long expectedChecksum = controlIs.readLong();
|
|
||||||
|
|
||||||
byte data[] = new byte[size];
|
|
||||||
reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
|
|
||||||
|
|
||||||
Checksum checksum = new Adler32();
|
|
||||||
checksum.update(data, 0, data.length);
|
|
||||||
|
|
||||||
if( expectedChecksum!=checksum.getValue() ) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
|
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
// Perhaps it's just some corruption... scan through the file to find the next valid batch record. We
|
||||||
|
// may have subsequent valid batch records.
|
||||||
|
int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
|
||||||
|
if( nextOffset >=0 ) {
|
||||||
|
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
|
||||||
|
LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
|
||||||
|
dataFile.corruptedBlocks.add(sequence);
|
||||||
|
location.setOffset(nextOffset);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -224,9 +210,83 @@ public class Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataFile.setLength(location.getOffset());
|
dataFile.setLength(location.getOffset());
|
||||||
|
|
||||||
|
if( !dataFile.corruptedBlocks.isEmpty() ) {
|
||||||
|
// Is the end of the data file corrupted?
|
||||||
|
if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
|
||||||
|
dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
|
||||||
|
ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
|
||||||
|
byte data[] = new byte[1024*4];
|
||||||
|
ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
|
||||||
|
|
||||||
|
int pos = 0;
|
||||||
|
while( true ) {
|
||||||
|
pos = bs.indexOf(header, pos);
|
||||||
|
if( pos >= 0 ) {
|
||||||
|
return offset+pos;
|
||||||
|
} else {
|
||||||
|
// need to load the next data chunck in..
|
||||||
|
if( bs.length != data.length ) {
|
||||||
|
// If we had a short read then we were at EOF
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
|
||||||
|
bs = new ByteSequence(data, 0, reader.read(offset, data));
|
||||||
|
pos=0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
|
||||||
|
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
|
||||||
|
DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
|
||||||
|
|
||||||
|
reader.readFully(offset, controlRecord);
|
||||||
|
|
||||||
|
// Assert that it's a batch record.
|
||||||
|
for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
|
||||||
|
if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = controlIs.readInt();
|
||||||
|
if( size > MAX_BATCH_SIZE ) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( isChecksum() ) {
|
||||||
|
|
||||||
|
long expectedChecksum = controlIs.readLong();
|
||||||
|
if( expectedChecksum == 0 ) {
|
||||||
|
// Checksuming was not enabled when the record was stored.
|
||||||
|
// we can't validate the record :(
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte data[] = new byte[size];
|
||||||
|
reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
|
||||||
|
|
||||||
|
Checksum checksum = new Adler32();
|
||||||
|
checksum.update(data, 0, data.length);
|
||||||
|
|
||||||
|
if( expectedChecksum!=checksum.getValue() ) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void addToTotalLength(int size) {
|
void addToTotalLength(int size) {
|
||||||
totalLength.addAndGet(size);
|
totalLength.addAndGet(size);
|
||||||
}
|
}
|
||||||
|
@ -640,5 +700,11 @@ public class Journal {
|
||||||
this.checksum = checksumWrites;
|
this.checksum = checksumWrites;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isCheckForCorruptionOnStartup() {
|
||||||
|
return checkForCorruptionOnStartup;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
|
||||||
|
this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,4 +71,35 @@ public class ByteSequence {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int indexOf(ByteSequence needle, int pos) {
|
||||||
|
int max = length - needle.length;
|
||||||
|
for (int i = pos; i < max; i++) {
|
||||||
|
if (matches(needle, i)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean matches(ByteSequence needle, int pos) {
|
||||||
|
for (int i = 0; i < needle.length; i++) {
|
||||||
|
if( data[offset + pos+ i] != needle.data[needle.offset + i] ) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte getByte(int i) {
|
||||||
|
return data[offset+i];
|
||||||
|
}
|
||||||
|
|
||||||
|
final public int indexOf(byte value, int pos) {
|
||||||
|
for (int i = pos; i < length; i++) {
|
||||||
|
if (data[offset + i] == value) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,6 +176,17 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
|
||||||
return rc.first;
|
return rc.first;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Sequence removeLastSequence() {
|
||||||
|
if (isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Sequence rc = getTail();
|
||||||
|
rc.unlink();
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes and returns the first sequence that is count range large.
|
* Removes and returns the first sequence that is count range large.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue