mirror of https://github.com/apache/lucene.git
LUCENE-7113: add best-effort checksum verification to temp files written by OfflineSorter and BKDWriter
This commit is contained in:
parent
5ea86b14c3
commit
ef7a012513
|
@ -46,6 +46,7 @@ import java.util.TreeMap;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.store.ByteArrayDataOutput;
|
import org.apache.lucene.store.ByteArrayDataOutput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
|
@ -58,9 +59,9 @@ import org.apache.lucene.util.CharsRef;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.IntsRef;
|
import org.apache.lucene.util.IntsRef;
|
||||||
import org.apache.lucene.util.IntsRefBuilder;
|
import org.apache.lucene.util.IntsRefBuilder;
|
||||||
import org.apache.lucene.util.OfflineSorter;
|
|
||||||
import org.apache.lucene.util.OfflineSorter.ByteSequencesReader;
|
import org.apache.lucene.util.OfflineSorter.ByteSequencesReader;
|
||||||
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
|
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
|
||||||
|
import org.apache.lucene.util.OfflineSorter;
|
||||||
import org.apache.lucene.util.RamUsageEstimator;
|
import org.apache.lucene.util.RamUsageEstimator;
|
||||||
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
|
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
|
||||||
import org.apache.lucene.util.automaton.RegExp;
|
import org.apache.lucene.util.automaton.RegExp;
|
||||||
|
@ -830,6 +831,7 @@ public class Dictionary {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
CodecUtil.writeFooter(unsorted);
|
||||||
}
|
}
|
||||||
|
|
||||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, new Comparator<BytesRef>() {
|
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, new Comparator<BytesRef>() {
|
||||||
|
@ -885,7 +887,7 @@ public class Dictionary {
|
||||||
|
|
||||||
boolean success2 = false;
|
boolean success2 = false;
|
||||||
|
|
||||||
try (ByteSequencesReader reader = new ByteSequencesReader(tempDir.openInput(sorted, IOContext.READONCE))) {
|
try (ByteSequencesReader reader = new ByteSequencesReader(tempDir.openChecksumInput(sorted, IOContext.READONCE), sorted)) {
|
||||||
BytesRefBuilder scratchLine = new BytesRefBuilder();
|
BytesRefBuilder scratchLine = new BytesRefBuilder();
|
||||||
|
|
||||||
// TODO: the flags themselves can be double-chars (long) or also numeric
|
// TODO: the flags themselves can be double-chars (long) or also numeric
|
||||||
|
|
|
@ -438,9 +438,9 @@ public final class CodecUtil {
|
||||||
long remaining = in.length() - in.getFilePointer();
|
long remaining = in.length() - in.getFilePointer();
|
||||||
long expected = footerLength();
|
long expected = footerLength();
|
||||||
if (remaining < expected) {
|
if (remaining < expected) {
|
||||||
throw new CorruptIndexException("misplaced codec footer (file truncated?): remaining=" + remaining + ", expected=" + expected, in);
|
throw new CorruptIndexException("misplaced codec footer (file truncated?): remaining=" + remaining + ", expected=" + expected + ", fp=" + in.getFilePointer(), in);
|
||||||
} else if (remaining > expected) {
|
} else if (remaining > expected) {
|
||||||
throw new CorruptIndexException("misplaced codec footer (file extended?): remaining=" + remaining + ", expected=" + expected, in);
|
throw new CorruptIndexException("misplaced codec footer (file extended?): remaining=" + remaining + ", expected=" + expected + ", fp=" + in.getFilePointer(), in);
|
||||||
}
|
}
|
||||||
|
|
||||||
final int magic = in.readInt();
|
final int magic = in.readInt();
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
|
import org.apache.lucene.store.ChecksumIndexInput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
|
@ -238,8 +240,7 @@ public class OfflineSorter {
|
||||||
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
|
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
|
||||||
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try (ByteSequencesReader is = getReader(dir.openInput(inputFileName, IOContext.READONCE))) {
|
try (ByteSequencesReader is = getReader(dir.openChecksumInput(inputFileName, IOContext.READONCE), inputFileName)) {
|
||||||
|
|
||||||
int lineCount;
|
int lineCount;
|
||||||
while ((lineCount = readPartition(is)) > 0) {
|
while ((lineCount = readPartition(is)) > 0) {
|
||||||
segments.add(sortPartition(trackingDir));
|
segments.add(sortPartition(trackingDir));
|
||||||
|
@ -271,6 +272,8 @@ public class OfflineSorter {
|
||||||
String result;
|
String result;
|
||||||
if (segments.isEmpty()) {
|
if (segments.isEmpty()) {
|
||||||
try (IndexOutput out = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT)) {
|
try (IndexOutput out = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT)) {
|
||||||
|
// Write empty file footer
|
||||||
|
CodecUtil.writeFooter(out);
|
||||||
result = out.getName();
|
result = out.getName();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -281,6 +284,9 @@ public class OfflineSorter {
|
||||||
assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result);
|
assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result);
|
||||||
|
|
||||||
sortInfo.totalTime = (System.currentTimeMillis() - sortInfo.totalTime);
|
sortInfo.totalTime = (System.currentTimeMillis() - sortInfo.totalTime);
|
||||||
|
|
||||||
|
CodecUtil.checkFooter(is.in);
|
||||||
|
|
||||||
success = true;
|
success = true;
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -312,10 +318,20 @@ public class OfflineSorter {
|
||||||
// Clean up the buffer for the next partition.
|
// Clean up the buffer for the next partition.
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
|
||||||
|
CodecUtil.writeFooter(out.out);
|
||||||
|
|
||||||
return tempFile.getName();
|
return tempFile.getName();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Called on exception, to check whether the checksum is also corrupt in this source, and add that
|
||||||
|
* information (checksum matched or didn't) as a suppressed exception. */
|
||||||
|
private void verifyChecksum(Throwable priorException, ByteSequencesReader reader) throws IOException {
|
||||||
|
try (ChecksumIndexInput in = dir.openChecksumInput(reader.name, IOContext.READONCE)) {
|
||||||
|
CodecUtil.checkFooter(in, priorException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Merge the most recent {@code maxTempFile} partitions into a new partition. */
|
/** Merge the most recent {@code maxTempFile} partitions into a new partition. */
|
||||||
void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
|
void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
@ -338,16 +354,20 @@ public class OfflineSorter {
|
||||||
|
|
||||||
String newSegmentName = null;
|
String newSegmentName = null;
|
||||||
|
|
||||||
try (IndexOutput out = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
|
try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT))) {
|
||||||
ByteSequencesWriter writer = getWriter(out);) {
|
|
||||||
|
|
||||||
newSegmentName = out.getName();
|
newSegmentName = writer.out.getName();
|
||||||
|
|
||||||
// Open streams and read the top for each file
|
// Open streams and read the top for each file
|
||||||
for (int i = 0; i < segmentsToMerge.size(); i++) {
|
for (int i = 0; i < segmentsToMerge.size(); i++) {
|
||||||
streams[i] = getReader(dir.openInput(segmentsToMerge.get(i), IOContext.READONCE));
|
streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i));
|
||||||
BytesRefBuilder bytes = new BytesRefBuilder();
|
BytesRefBuilder bytes = new BytesRefBuilder();
|
||||||
boolean result = streams[i].read(bytes);
|
boolean result = false;
|
||||||
|
try {
|
||||||
|
result = streams[i].read(bytes);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
verifyChecksum(t, streams[i]);
|
||||||
|
}
|
||||||
assert result;
|
assert result;
|
||||||
queue.insertWithOverflow(new FileAndTop(i, bytes));
|
queue.insertWithOverflow(new FileAndTop(i, bytes));
|
||||||
}
|
}
|
||||||
|
@ -359,11 +379,24 @@ public class OfflineSorter {
|
||||||
FileAndTop top;
|
FileAndTop top;
|
||||||
while ((top = queue.top()) != null) {
|
while ((top = queue.top()) != null) {
|
||||||
writer.write(top.current.bytes(), 0, top.current.length());
|
writer.write(top.current.bytes(), 0, top.current.length());
|
||||||
if (!streams[top.fd].read(top.current)) {
|
boolean result = false;
|
||||||
queue.pop();
|
try {
|
||||||
} else {
|
result = streams[top.fd].read(top.current);
|
||||||
queue.updateTop();
|
} catch (Throwable t) {
|
||||||
|
verifyChecksum(t, streams[top.fd]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
queue.updateTop();
|
||||||
|
} else {
|
||||||
|
queue.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CodecUtil.writeFooter(writer.out);
|
||||||
|
|
||||||
|
for(ByteSequencesReader reader : streams) {
|
||||||
|
CodecUtil.checkFooter(reader.in);
|
||||||
}
|
}
|
||||||
|
|
||||||
sortInfo.mergeTime += System.currentTimeMillis() - start;
|
sortInfo.mergeTime += System.currentTimeMillis() - start;
|
||||||
|
@ -384,7 +417,16 @@ public class OfflineSorter {
|
||||||
int readPartition(ByteSequencesReader reader) throws IOException {
|
int readPartition(ByteSequencesReader reader) throws IOException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
final BytesRefBuilder scratch = new BytesRefBuilder();
|
final BytesRefBuilder scratch = new BytesRefBuilder();
|
||||||
while (reader.read(scratch)) {
|
while (true) {
|
||||||
|
boolean result = false;
|
||||||
|
try {
|
||||||
|
result = reader.read(scratch);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
verifyChecksum(t, reader);
|
||||||
|
}
|
||||||
|
if (result == false) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
buffer.append(scratch.get());
|
buffer.append(scratch.get());
|
||||||
// Account for the created objects.
|
// Account for the created objects.
|
||||||
// (buffer slots do not account to buffer size.)
|
// (buffer slots do not account to buffer size.)
|
||||||
|
@ -412,13 +454,14 @@ public class OfflineSorter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Subclasses can override to change how byte sequences are read from disk. */
|
/** Subclasses can override to change how byte sequences are read from disk. */
|
||||||
protected ByteSequencesReader getReader(IndexInput in) throws IOException {
|
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
|
||||||
return new ByteSequencesReader(in);
|
return new ByteSequencesReader(in, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class to emit length-prefixed byte[] entries to an output stream for sorting.
|
* Utility class to emit length-prefixed byte[] entries to an output stream for sorting.
|
||||||
* Complementary to {@link ByteSequencesReader}.
|
* Complementary to {@link ByteSequencesReader}. You must use {@link CodecUtil#writeFooter}
|
||||||
|
* to write a footer at the end of the input file.
|
||||||
*/
|
*/
|
||||||
public static class ByteSequencesWriter implements Closeable {
|
public static class ByteSequencesWriter implements Closeable {
|
||||||
protected final IndexOutput out;
|
protected final IndexOutput out;
|
||||||
|
@ -476,11 +519,15 @@ public class OfflineSorter {
|
||||||
* Complementary to {@link ByteSequencesWriter}.
|
* Complementary to {@link ByteSequencesWriter}.
|
||||||
*/
|
*/
|
||||||
public static class ByteSequencesReader implements Closeable {
|
public static class ByteSequencesReader implements Closeable {
|
||||||
protected final IndexInput in;
|
protected final String name;
|
||||||
|
protected final ChecksumIndexInput in;
|
||||||
|
protected final long end;
|
||||||
|
|
||||||
/** Constructs a ByteSequencesReader from the provided IndexInput */
|
/** Constructs a ByteSequencesReader from the provided IndexInput */
|
||||||
public ByteSequencesReader(IndexInput in) {
|
public ByteSequencesReader(ChecksumIndexInput in, String name) {
|
||||||
this.in = in;
|
this.in = in;
|
||||||
|
this.name = name;
|
||||||
|
end = in.length() - CodecUtil.footerLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -492,13 +539,11 @@ public class OfflineSorter {
|
||||||
* @throws EOFException if the file ends before the full sequence is read.
|
* @throws EOFException if the file ends before the full sequence is read.
|
||||||
*/
|
*/
|
||||||
public boolean read(BytesRefBuilder ref) throws IOException {
|
public boolean read(BytesRefBuilder ref) throws IOException {
|
||||||
short length;
|
if (in.getFilePointer() >= end) {
|
||||||
try {
|
|
||||||
length = in.readShort();
|
|
||||||
} catch (EOFException e) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
short length = in.readShort();
|
||||||
ref.grow(length);
|
ref.grow(length);
|
||||||
ref.setLength(length);
|
ref.setLength(length);
|
||||||
in.readBytes(ref.bytes(), 0, length);
|
in.readBytes(ref.bytes(), 0, length);
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.lucene.util.bkd;
|
package org.apache.lucene.util.bkd;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -27,8 +26,9 @@ import java.util.List;
|
||||||
import org.apache.lucene.codecs.CodecUtil;
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.index.MergeState;
|
import org.apache.lucene.index.MergeState;
|
||||||
import org.apache.lucene.store.ByteArrayDataInput;
|
import org.apache.lucene.store.ByteArrayDataInput;
|
||||||
|
import org.apache.lucene.store.ChecksumIndexInput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||||
import org.apache.lucene.util.ArrayUtil;
|
import org.apache.lucene.util.ArrayUtil;
|
||||||
|
@ -218,7 +218,7 @@ public class BKDWriter implements Closeable {
|
||||||
// For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree:
|
// For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree:
|
||||||
offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill");
|
offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill");
|
||||||
tempInput = offlinePointWriter.out;
|
tempInput = offlinePointWriter.out;
|
||||||
PointReader reader = heapPointWriter.getReader(0);
|
PointReader reader = heapPointWriter.getReader(0, pointCount);
|
||||||
for(int i=0;i<pointCount;i++) {
|
for(int i=0;i<pointCount;i++) {
|
||||||
boolean hasNext = reader.next();
|
boolean hasNext = reader.next();
|
||||||
assert hasNext;
|
assert hasNext;
|
||||||
|
@ -750,23 +750,21 @@ public class BKDWriter implements Closeable {
|
||||||
|
|
||||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
||||||
@Override
|
@Override
|
||||||
protected ByteSequencesReader getReader(IndexInput in) throws IOException {
|
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
|
||||||
return new ByteSequencesReader(in) {
|
return new ByteSequencesReader(in, name) {
|
||||||
@Override
|
@Override
|
||||||
public boolean read(BytesRefBuilder ref) throws IOException {
|
public boolean read(BytesRefBuilder ref) throws IOException {
|
||||||
ref.grow(bytesPerDoc);
|
if (in.getFilePointer() >= end) {
|
||||||
try {
|
|
||||||
in.readBytes(ref.bytes(), 0, bytesPerDoc);
|
|
||||||
} catch (EOFException eofe) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
ref.grow(bytesPerDoc);
|
||||||
|
in.readBytes(ref.bytes(), 0, bytesPerDoc);
|
||||||
ref.setLength(bytesPerDoc);
|
ref.setLength(bytesPerDoc);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
sorter.sort(tempInput.getName());
|
sorter.sort(tempInput.getName());
|
||||||
|
|
||||||
assert lastWriter[0] != null;
|
assert lastWriter[0] != null;
|
||||||
|
@ -785,7 +783,7 @@ public class BKDWriter implements Closeable {
|
||||||
public long finish(IndexOutput out) throws IOException {
|
public long finish(IndexOutput out) throws IOException {
|
||||||
// System.out.println("\nBKDTreeWriter.finish pointCount=" + pointCount + " out=" + out + " heapWriter=" + heapPointWriter);
|
// System.out.println("\nBKDTreeWriter.finish pointCount=" + pointCount + " out=" + out + " heapWriter=" + heapPointWriter);
|
||||||
|
|
||||||
// TODO: specialize the 1D case? it's much faster at indexing time (no partitioning on recruse...)
|
// TODO: specialize the 1D case? it's much faster at indexing time (no partitioning on recurse...)
|
||||||
|
|
||||||
// Catch user silliness:
|
// Catch user silliness:
|
||||||
if (heapPointWriter == null && tempInput == null) {
|
if (heapPointWriter == null && tempInput == null) {
|
||||||
|
@ -964,13 +962,32 @@ public class BKDWriter implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Called on exception, to check whether the checksum is also corrupt in this source, and add that
|
||||||
|
* information (checksum matched or didn't) as a suppressed exception. */
|
||||||
|
private void verifyChecksum(Throwable priorException, PointWriter writer) throws IOException {
|
||||||
|
// TODO: we could improve this, to always validate checksum as we recurse, if we shared left and
|
||||||
|
// right reader after recursing to children, and possibly within recursed children,
|
||||||
|
// since all together they make a single pass through the file. But this is a sizable re-org,
|
||||||
|
// and would mean leaving readers (IndexInputs) open for longer:
|
||||||
|
if (writer instanceof OfflinePointWriter) {
|
||||||
|
// We are reading from a temp file; go verify the checksum:
|
||||||
|
String tempFileName = ((OfflinePointWriter) writer).out.getName();
|
||||||
|
try (ChecksumIndexInput in = tempDir.openChecksumInput(tempFileName, IOContext.READONCE)) {
|
||||||
|
CodecUtil.checkFooter(in, priorException);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We are reading from heap; nothing to add:
|
||||||
|
IOUtils.reThrow(priorException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Marks bits for the ords (points) that belong in the right sub tree (those docs that have values >= the splitValue). */
|
/** Marks bits for the ords (points) that belong in the right sub tree (those docs that have values >= the splitValue). */
|
||||||
private byte[] markRightTree(long rightCount, int splitDim, PathSlice source, LongBitSet ordBitSet) throws IOException {
|
private byte[] markRightTree(long rightCount, int splitDim, PathSlice source, LongBitSet ordBitSet) throws IOException {
|
||||||
|
|
||||||
// Now we mark ords that fall into the right half, so we can partition on all other dims that are not the split dim:
|
// Now we mark ords that fall into the right half, so we can partition on all other dims that are not the split dim:
|
||||||
|
|
||||||
// Read the split value, then mark all ords in the right tree (larger than the split value):
|
// Read the split value, then mark all ords in the right tree (larger than the split value):
|
||||||
try (PointReader reader = source.writer.getReader(source.start + source.count - rightCount)) {
|
try (PointReader reader = source.writer.getReader(source.start + source.count - rightCount, rightCount)) {
|
||||||
boolean result = reader.next();
|
boolean result = reader.next();
|
||||||
assert result;
|
assert result;
|
||||||
|
|
||||||
|
@ -983,11 +1000,15 @@ public class BKDWriter implements Closeable {
|
||||||
// Start at 1 because we already did the first value above (so we could keep the split value):
|
// Start at 1 because we already did the first value above (so we could keep the split value):
|
||||||
for(int i=1;i<rightCount;i++) {
|
for(int i=1;i<rightCount;i++) {
|
||||||
result = reader.next();
|
result = reader.next();
|
||||||
assert result;
|
if (result == false) {
|
||||||
assert ordBitSet.get(reader.ord()) == false;
|
throw new IllegalStateException("did not see enough points from reader=" + reader);
|
||||||
|
}
|
||||||
|
assert ordBitSet.get(reader.ord()) == false: "ord=" + reader.ord() + " was seen twice from " + source.writer;
|
||||||
ordBitSet.set(reader.ord());
|
ordBitSet.set(reader.ord());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
verifyChecksum(t, source.writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return scratch1;
|
return scratch1;
|
||||||
|
@ -1024,13 +1045,12 @@ public class BKDWriter implements Closeable {
|
||||||
return splitDim;
|
return splitDim;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Only called in the 1D case, to pull a partition back into heap once
|
/** Pull a partition back into heap once the point count is low enough while recursing. */
|
||||||
* the point count is low enough while recursing. */
|
|
||||||
private PathSlice switchToHeap(PathSlice source) throws IOException {
|
private PathSlice switchToHeap(PathSlice source) throws IOException {
|
||||||
int count = Math.toIntExact(source.count);
|
int count = Math.toIntExact(source.count);
|
||||||
try (
|
try (
|
||||||
PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds);
|
PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds);
|
||||||
PointReader reader = source.writer.getReader(source.start);
|
PointReader reader = source.writer.getReader(source.start, source.count);
|
||||||
) {
|
) {
|
||||||
for(int i=0;i<count;i++) {
|
for(int i=0;i<count;i++) {
|
||||||
boolean hasNext = reader.next();
|
boolean hasNext = reader.next();
|
||||||
|
@ -1038,6 +1058,11 @@ public class BKDWriter implements Closeable {
|
||||||
writer.append(reader.packedValue(), reader.ord(), reader.docID());
|
writer.append(reader.packedValue(), reader.ord(), reader.docID());
|
||||||
}
|
}
|
||||||
return new PathSlice(writer, 0, count);
|
return new PathSlice(writer, 0, count);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
verifyChecksum(t, source.writer);
|
||||||
|
|
||||||
|
// Dead code but javac disagrees:
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1174,14 +1199,17 @@ public class BKDWriter implements Closeable {
|
||||||
|
|
||||||
try (PointWriter leftPointWriter = getPointWriter(leftCount, "left" + dim);
|
try (PointWriter leftPointWriter = getPointWriter(leftCount, "left" + dim);
|
||||||
PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim);
|
PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim);
|
||||||
PointReader reader = slices[dim].writer.getReader(slices[dim].start);) {
|
PointReader reader = slices[dim].writer.getReader(slices[dim].start, slices[dim].count);) {
|
||||||
|
|
||||||
long nextRightCount = reader.split(source.count, ordBitSet, leftPointWriter, rightPointWriter, dim == dimToClear);
|
long nextRightCount = reader.split(source.count, ordBitSet, leftPointWriter, rightPointWriter, dim == dimToClear);
|
||||||
|
if (rightCount != nextRightCount) {
|
||||||
|
throw new IllegalStateException("wrong number of points in split: expected=" + rightCount + " but actual=" + nextRightCount);
|
||||||
|
}
|
||||||
|
|
||||||
leftSlices[dim] = new PathSlice(leftPointWriter, 0, leftCount);
|
leftSlices[dim] = new PathSlice(leftPointWriter, 0, leftCount);
|
||||||
rightSlices[dim] = new PathSlice(rightPointWriter, 0, rightCount);
|
rightSlices[dim] = new PathSlice(rightPointWriter, 0, rightCount);
|
||||||
|
} catch (Throwable t) {
|
||||||
assert rightCount == nextRightCount: "rightCount=" + rightCount + " nextRightCount=" + nextRightCount;
|
verifyChecksum(t, slices[dim].writer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,8 @@ final class HeapPointWriter implements PointWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PointReader getReader(long start) {
|
public PointReader getReader(long start, long length) {
|
||||||
|
assert start + length <= docIDs.length: "start=" + start + " length=" + length + " docIDs.length=" + docIDs.length;
|
||||||
return new HeapPointReader(blocks, valuesPerBlock, packedBytesLength, ords, ordsLong, docIDs, (int) start, nextWrite);
|
return new HeapPointReader(blocks, valuesPerBlock, packedBytesLength, ords, ordsLong, docIDs, (int) start, nextWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +151,6 @@ final class HeapPointWriter implements PointWriter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "HeapPointWriter(count=" + nextWrite + " alloc=" + ords.length + ")";
|
return "HeapPointWriter(count=" + nextWrite + " alloc=" + docIDs.length + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.lucene.util.bkd;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
|
import org.apache.lucene.store.ChecksumIndexInput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
|
@ -28,24 +30,42 @@ import org.apache.lucene.util.LongBitSet;
|
||||||
/** Reads points from disk in a fixed-with format, previously written with {@link OfflinePointWriter}. */
|
/** Reads points from disk in a fixed-with format, previously written with {@link OfflinePointWriter}. */
|
||||||
final class OfflinePointReader extends PointReader {
|
final class OfflinePointReader extends PointReader {
|
||||||
long countLeft;
|
long countLeft;
|
||||||
private final IndexInput in;
|
final IndexInput in;
|
||||||
private final byte[] packedValue;
|
private final byte[] packedValue;
|
||||||
private long ord;
|
private long ord;
|
||||||
private int docID;
|
private int docID;
|
||||||
// true if ords are written as long (8 bytes), else 4 bytes
|
// true if ords are written as long (8 bytes), else 4 bytes
|
||||||
private boolean longOrds;
|
private boolean longOrds;
|
||||||
|
private boolean checked;
|
||||||
|
|
||||||
OfflinePointReader(Directory tempDir, String tempFileName, int packedBytesLength, long start, long length, boolean longOrds) throws IOException {
|
OfflinePointReader(Directory tempDir, String tempFileName, int packedBytesLength, long start, long length, boolean longOrds) throws IOException {
|
||||||
in = tempDir.openInput(tempFileName, IOContext.READONCE);
|
|
||||||
int bytesPerDoc = packedBytesLength + Integer.BYTES;
|
int bytesPerDoc = packedBytesLength + Integer.BYTES;
|
||||||
if (longOrds) {
|
if (longOrds) {
|
||||||
bytesPerDoc += Long.BYTES;
|
bytesPerDoc += Long.BYTES;
|
||||||
} else {
|
} else {
|
||||||
bytesPerDoc += Integer.BYTES;
|
bytesPerDoc += Integer.BYTES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((start + length) * bytesPerDoc + CodecUtil.footerLength() > tempDir.fileLength(tempFileName)) {
|
||||||
|
throw new IllegalArgumentException("requested slice is beyond the length of this file: start=" + start + " length=" + length + " bytesPerDoc=" + bytesPerDoc + " fileLength=" + tempDir.fileLength(tempFileName) + " tempFileName=" + tempFileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort checksumming:
|
||||||
|
if (start == 0 && length*bytesPerDoc == tempDir.fileLength(tempFileName) - CodecUtil.footerLength()) {
|
||||||
|
// If we are going to read the entire file, e.g. because BKDWriter is now
|
||||||
|
// partitioning it, we open with checksums:
|
||||||
|
in = tempDir.openChecksumInput(tempFileName, IOContext.READONCE);
|
||||||
|
} else {
|
||||||
|
// Since we are going to seek somewhere in the middle of a possibly huge
|
||||||
|
// file, and not read all bytes from there, don't use ChecksumIndexInput here.
|
||||||
|
// This is typically fine, because this same file will later be read fully,
|
||||||
|
// at another level of the BKDWriter recursion
|
||||||
|
in = tempDir.openInput(tempFileName, IOContext.READONCE);
|
||||||
|
}
|
||||||
|
|
||||||
long seekFP = start * bytesPerDoc;
|
long seekFP = start * bytesPerDoc;
|
||||||
in.seek(seekFP);
|
in.seek(seekFP);
|
||||||
this.countLeft = length;
|
countLeft = length;
|
||||||
packedValue = new byte[packedBytesLength];
|
packedValue = new byte[packedBytesLength];
|
||||||
this.longOrds = longOrds;
|
this.longOrds = longOrds;
|
||||||
}
|
}
|
||||||
|
@ -90,7 +110,14 @@ final class OfflinePointReader extends PointReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
in.close();
|
try {
|
||||||
|
if (countLeft == 0 && in instanceof ChecksumIndexInput && checked == false) {
|
||||||
|
checked = true;
|
||||||
|
CodecUtil.checkFooter((ChecksumIndexInput) in);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -119,13 +146,12 @@ final class OfflinePointReader extends PointReader {
|
||||||
IndexOutput rightOut = ((OfflinePointWriter) right).out;
|
IndexOutput rightOut = ((OfflinePointWriter) right).out;
|
||||||
IndexOutput leftOut = ((OfflinePointWriter) left).out;
|
IndexOutput leftOut = ((OfflinePointWriter) left).out;
|
||||||
|
|
||||||
((OfflinePointWriter) right).count = count;
|
|
||||||
((OfflinePointWriter) left).count = count;
|
|
||||||
|
|
||||||
assert count <= countLeft: "count=" + count + " countLeft=" + countLeft;
|
assert count <= countLeft: "count=" + count + " countLeft=" + countLeft;
|
||||||
|
|
||||||
countLeft -= count;
|
countLeft -= count;
|
||||||
|
|
||||||
|
long countStart = count;
|
||||||
|
|
||||||
byte[] buffer = new byte[bytesPerDoc];
|
byte[] buffer = new byte[bytesPerDoc];
|
||||||
while (count > 0) {
|
while (count > 0) {
|
||||||
in.readBytes(buffer, 0, buffer.length);
|
in.readBytes(buffer, 0, buffer.length);
|
||||||
|
@ -148,6 +174,9 @@ final class OfflinePointReader extends PointReader {
|
||||||
count--;
|
count--;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
((OfflinePointWriter) right).count = rightCount;
|
||||||
|
((OfflinePointWriter) left).count = countStart-rightCount;
|
||||||
|
|
||||||
return rightCount;
|
return rightCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.lucene.util.bkd;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
|
import org.apache.lucene.index.CorruptIndexException;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
|
@ -65,15 +67,22 @@ final class OfflinePointWriter implements PointWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PointReader getReader(long start) throws IOException {
|
public PointReader getReader(long start, long length) throws IOException {
|
||||||
assert closed;
|
assert closed;
|
||||||
return new OfflinePointReader(tempDir, out.getName(), packedBytesLength, start, count-start, longOrds);
|
assert start + length <= count: "start=" + start + " length=" + length + " count=" + count;
|
||||||
|
return new OfflinePointReader(tempDir, out.getName(), packedBytesLength, start, length, longOrds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
out.close();
|
if (closed == false) {
|
||||||
closed = true;
|
try {
|
||||||
|
CodecUtil.writeFooter(out);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -86,4 +95,3 @@ final class OfflinePointWriter implements PointWriter {
|
||||||
return "OfflinePointWriter(count=" + count + " tempFileName=" + out.getName() + ")";
|
return "OfflinePointWriter(count=" + count + " tempFileName=" + out.getName() + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ interface PointWriter extends Closeable {
|
||||||
void append(byte[] packedValue, long ord, int docID) throws IOException;
|
void append(byte[] packedValue, long ord, int docID) throws IOException;
|
||||||
|
|
||||||
/** Returns a {@link PointReader} iterator to step through all previously added points */
|
/** Returns a {@link PointReader} iterator to step through all previously added points */
|
||||||
PointReader getReader(long startPoint) throws IOException;
|
PointReader getReader(long startPoint, long length) throws IOException;
|
||||||
|
|
||||||
/** Removes any temp files behind this writer */
|
/** Removes any temp files behind this writer */
|
||||||
void destroy() throws IOException;
|
void destroy() throws IOException;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.lucene.util;
|
package org.apache.lucene.util;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -25,7 +26,11 @@ import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
|
import org.apache.lucene.index.CorruptIndexException;
|
||||||
|
import org.apache.lucene.store.CorruptingIndexOutput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.store.FilterDirectory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
|
@ -106,6 +111,29 @@ public class TestOfflineSorter extends LuceneTestCase {
|
||||||
byte [][] bytes = data.toArray(new byte[data.size()][]);
|
byte [][] bytes = data.toArray(new byte[data.size()][]);
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generates same data every time:
|
||||||
|
private byte[][] generateFixed(int howMuchDataInBytes) {
|
||||||
|
ArrayList<byte[]> data = new ArrayList<>();
|
||||||
|
int length = 256;
|
||||||
|
byte counter = 0;
|
||||||
|
while (howMuchDataInBytes > 0) {
|
||||||
|
byte[] current = new byte[length];
|
||||||
|
for(int i=0;i<current.length;i++) {
|
||||||
|
current[i] = counter;
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
data.add(current);
|
||||||
|
howMuchDataInBytes -= current.length;
|
||||||
|
|
||||||
|
length--;
|
||||||
|
if (length <= 128) {
|
||||||
|
length = 256;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
byte [][] bytes = data.toArray(new byte[data.size()][]);
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
static final Comparator<byte[]> unsignedByteOrderComparator = new Comparator<byte[]>() {
|
static final Comparator<byte[]> unsignedByteOrderComparator = new Comparator<byte[]>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -173,6 +201,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
||||||
for (byte [] datum : data) {
|
for (byte [] datum : data) {
|
||||||
w.write(datum);
|
w.write(datum);
|
||||||
}
|
}
|
||||||
|
CodecUtil.writeFooter(out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,4 +255,176 @@ public class TestOfflineSorter extends LuceneTestCase {
|
||||||
|
|
||||||
assertFalse(failed.get());
|
assertFalse(failed.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on the incoming (unsorted) file is caught, even if the corruption didn't confuse OfflineSorter! */
|
||||||
|
public void testBitFlippedOnInput1() throws Exception {
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
if (prefix.equals("unsorted")) {
|
||||||
|
return new CorruptingIndexOutput(dir0, 22, out);
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||||
|
writeAll(unsorted, generateFixed(10*1024));
|
||||||
|
|
||||||
|
CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
|
||||||
|
new OfflineSorter(dir, "foo").sort(unsorted.getName());
|
||||||
|
});
|
||||||
|
assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on the incoming (unsorted) file is caught, if the corruption did confuse OfflineSorter! */
|
||||||
|
public void testBitFlippedOnInput2() throws Exception {
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
if (prefix.equals("unsorted")) {
|
||||||
|
return new CorruptingIndexOutput(dir0, 22, out) {
|
||||||
|
@Override
|
||||||
|
protected void corruptFile() throws IOException {
|
||||||
|
String newTempName;
|
||||||
|
try(IndexOutput tmpOut = dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT);
|
||||||
|
IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) {
|
||||||
|
newTempName = tmpOut.getName();
|
||||||
|
// Replace length at the end with a too-long value:
|
||||||
|
short v = in.readShort();
|
||||||
|
assertEquals(256, v);
|
||||||
|
tmpOut.writeShort(Short.MAX_VALUE);
|
||||||
|
tmpOut.copyBytes(in, in.length()-Short.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete original and copy corrupt version back:
|
||||||
|
dir0.deleteFile(out.getName());
|
||||||
|
dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT);
|
||||||
|
dir0.deleteFile(newTempName);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||||
|
writeAll(unsorted, generateFixed(5*1024));
|
||||||
|
|
||||||
|
// This corruption made OfflineSorter fail with its own exception, but we verify it also went and added (as suppressed) that the
|
||||||
|
// checksum was wrong:
|
||||||
|
EOFException e = expectThrows(EOFException.class, () -> {
|
||||||
|
new OfflineSorter(dir, "foo").sort(unsorted.getName());
|
||||||
|
});
|
||||||
|
assertEquals(1, e.getSuppressed().length);
|
||||||
|
assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException);
|
||||||
|
assertTrue(e.getSuppressed()[0].getMessage().contains("checksum failed (hardware problem?)"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on a temp file (partition) is caught, even if the corruption didn't confuse OfflineSorter! */
|
||||||
|
public void testBitFlippedOnPartition1() throws Exception {
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
|
||||||
|
boolean corrupted;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
if (corrupted == false && suffix.equals("sort")) {
|
||||||
|
corrupted = true;
|
||||||
|
return new CorruptingIndexOutput(dir0, 544677, out);
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||||
|
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||||
|
|
||||||
|
CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
|
||||||
|
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10).sort(unsorted.getName());
|
||||||
|
});
|
||||||
|
assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on a temp file (partition) is caught, if the corruption did confuse OfflineSorter! */
|
||||||
|
public void testBitFlippedOnPartition2() throws Exception {
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
|
||||||
|
boolean corrupted;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
if (corrupted == false && suffix.equals("sort")) {
|
||||||
|
corrupted = true;
|
||||||
|
return new CorruptingIndexOutput(dir0, 544677, out) {
|
||||||
|
@Override
|
||||||
|
protected void corruptFile() throws IOException {
|
||||||
|
String newTempName;
|
||||||
|
try(IndexOutput tmpOut = dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT);
|
||||||
|
IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) {
|
||||||
|
newTempName = tmpOut.getName();
|
||||||
|
tmpOut.copyBytes(in, 1025905);
|
||||||
|
short v = in.readShort();
|
||||||
|
assertEquals(254, v);
|
||||||
|
tmpOut.writeShort(Short.MAX_VALUE);
|
||||||
|
tmpOut.copyBytes(in, in.length()-1025905-Short.BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete original and copy corrupt version back:
|
||||||
|
dir0.deleteFile(out.getName());
|
||||||
|
dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT);
|
||||||
|
dir0.deleteFile(newTempName);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||||
|
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||||
|
|
||||||
|
EOFException e = expectThrows(EOFException.class, () -> {
|
||||||
|
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10).sort(unsorted.getName());
|
||||||
|
});
|
||||||
|
assertEquals(1, e.getSuppressed().length);
|
||||||
|
assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException);
|
||||||
|
assertTrue(e.getSuppressed()[0].getMessage().contains("checksum failed (hardware problem?)"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,12 @@ import java.util.Arrays;
|
||||||
import java.util.BitSet;
|
import java.util.BitSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.CorruptIndexException;
|
||||||
import org.apache.lucene.index.PointValues.IntersectVisitor;
|
import org.apache.lucene.index.PointValues.IntersectVisitor;
|
||||||
import org.apache.lucene.index.PointValues.Relation;
|
import org.apache.lucene.index.PointValues.Relation;
|
||||||
|
import org.apache.lucene.store.CorruptingIndexOutput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.store.FilterDirectory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
|
@ -399,27 +402,27 @@ public class TestBKD extends LuceneTestCase {
|
||||||
// Keep retrying until we 1) we allow a big enough heap, and 2) we hit a random IOExc from MDW:
|
// Keep retrying until we 1) we allow a big enough heap, and 2) we hit a random IOExc from MDW:
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
while (done == false) {
|
while (done == false) {
|
||||||
try (MockDirectoryWrapper dir = newMockFSDirectory(createTempDir())) {
|
MockDirectoryWrapper dir = newMockFSDirectory(createTempDir());
|
||||||
try {
|
try {
|
||||||
dir.setRandomIOExceptionRate(0.05);
|
dir.setRandomIOExceptionRate(0.05);
|
||||||
dir.setRandomIOExceptionRateOnOpen(0.05);
|
dir.setRandomIOExceptionRateOnOpen(0.05);
|
||||||
verify(dir, docValues, null, numDims, numBytesPerDim, 50, maxMBHeap);
|
verify(dir, docValues, null, numDims, numBytesPerDim, 50, maxMBHeap);
|
||||||
} catch (IllegalArgumentException iae) {
|
} catch (IllegalArgumentException iae) {
|
||||||
// This just means we got a too-small maxMB for the maxPointsInLeafNode; just retry w/ more heap
|
// This just means we got a too-small maxMB for the maxPointsInLeafNode; just retry w/ more heap
|
||||||
assertTrue(iae.getMessage().contains("either increase maxMBSortInHeap or decrease maxPointsInLeafNode"));
|
assertTrue(iae.getMessage().contains("either increase maxMBSortInHeap or decrease maxPointsInLeafNode"));
|
||||||
maxMBHeap *= 1.25;
|
maxMBHeap *= 1.25;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (ioe.getMessage().contains("a random IOException")) {
|
if (ioe.getMessage().contains("a random IOException")) {
|
||||||
// BKDWriter should fully clean up after itself:
|
// BKDWriter should fully clean up after itself:
|
||||||
done = true;
|
done = true;
|
||||||
} else {
|
} else {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] files = dir.listAll();
|
|
||||||
assertTrue("files=" + Arrays.toString(files), files.length == 0 || Arrays.equals(files, new String[] {"extra0"}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String[] files = dir.listAll();
|
||||||
|
assertTrue("files=" + Arrays.toString(files), files.length == 0 || Arrays.equals(files, new String[] {"extra0"}));
|
||||||
|
dir.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -775,4 +778,123 @@ public class TestBKD extends LuceneTestCase {
|
||||||
}
|
}
|
||||||
return dir;
|
return dir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on an input sort file is caught, even if BKDWriter doesn't get angry */
|
||||||
|
public void testBitFlippedOnPartition1() throws Exception {
|
||||||
|
|
||||||
|
// Generate fixed data set:
|
||||||
|
int numDocs = atLeast(10000);
|
||||||
|
int numBytesPerDim = 4;
|
||||||
|
int numDims = 3;
|
||||||
|
|
||||||
|
byte[][][] docValues = new byte[numDocs][][];
|
||||||
|
byte counter = 0;
|
||||||
|
|
||||||
|
for(int docID=0;docID<numDocs;docID++) {
|
||||||
|
byte[][] values = new byte[numDims][];
|
||||||
|
for(int dim=0;dim<numDims;dim++) {
|
||||||
|
values[dim] = new byte[numBytesPerDim];
|
||||||
|
for(int i=0;i<values[dim].length;i++) {
|
||||||
|
values[dim][i] = counter;
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
docValues[docID] = values;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
boolean corrupted;
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
if (corrupted == false && prefix.equals("_0_bkd1") && suffix.equals("sort")) {
|
||||||
|
corrupted = true;
|
||||||
|
return new CorruptingIndexOutput(dir0, 22, out);
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
|
||||||
|
verify(dir, docValues, null, numDims, numBytesPerDim, 50, 0.1);
|
||||||
|
});
|
||||||
|
assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Make sure corruption on a recursed partition is caught, when BKDWriter does get angry */
|
||||||
|
public void testBitFlippedOnPartition2() throws Exception {
|
||||||
|
|
||||||
|
// Generate fixed data set:
|
||||||
|
int numDocs = atLeast(10000);
|
||||||
|
int numBytesPerDim = 4;
|
||||||
|
int numDims = 3;
|
||||||
|
|
||||||
|
byte[][][] docValues = new byte[numDocs][][];
|
||||||
|
byte counter = 0;
|
||||||
|
|
||||||
|
for(int docID=0;docID<numDocs;docID++) {
|
||||||
|
byte[][] values = new byte[numDims][];
|
||||||
|
for(int dim=0;dim<numDims;dim++) {
|
||||||
|
values[dim] = new byte[numBytesPerDim];
|
||||||
|
for(int i=0;i<values[dim].length;i++) {
|
||||||
|
values[dim][i] = counter;
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
docValues[docID] = values;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Directory dir0 = newMockDirectory()) {
|
||||||
|
if (dir0 instanceof MockDirectoryWrapper) {
|
||||||
|
((MockDirectoryWrapper) dir0).setPreventDoubleWrite(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory dir = new FilterDirectory(dir0) {
|
||||||
|
boolean corrupted;
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
IndexOutput out = in.createTempOutput(prefix, suffix, context);
|
||||||
|
//System.out.println("prefix=" + prefix + " suffix=" + suffix);
|
||||||
|
if (corrupted == false && suffix.equals("bkd_left1")) {
|
||||||
|
//System.out.println("now corrupt byte=" + x + " prefix=" + prefix + " suffix=" + suffix);
|
||||||
|
corrupted = true;
|
||||||
|
return new CorruptingIndexOutput(dir0, 22072, out);
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Throwable t;
|
||||||
|
|
||||||
|
if (TEST_ASSERTS_ENABLED) {
|
||||||
|
t = expectThrows(AssertionError.class, () -> {
|
||||||
|
verify(dir, docValues, null, numDims, numBytesPerDim, 50, 0.1);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
t = expectThrows(ArrayIndexOutOfBoundsException.class, () -> {
|
||||||
|
verify(dir, docValues, null, numDims, numBytesPerDim, 50, 0.1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
assertCorruptionDetected(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertCorruptionDetected(Throwable t) {
|
||||||
|
for(Throwable suppressed : t.getSuppressed()) {
|
||||||
|
if (suppressed instanceof CorruptIndexException) {
|
||||||
|
if (suppressed.getMessage().contains("checksum failed (hardware problem?)")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail("did not see a supporessed CorruptIndexException");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.store.ByteArrayDataInput;
|
import org.apache.lucene.store.ByteArrayDataInput;
|
||||||
import org.apache.lucene.store.ByteArrayDataOutput;
|
import org.apache.lucene.store.ByteArrayDataOutput;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
@ -30,9 +31,9 @@ import org.apache.lucene.util.ArrayUtil;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.BytesRefBuilder;
|
import org.apache.lucene.util.BytesRefBuilder;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.OfflineSorter;
|
|
||||||
import org.apache.lucene.util.OfflineSorter.ByteSequencesReader;
|
import org.apache.lucene.util.OfflineSorter.ByteSequencesReader;
|
||||||
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
|
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
|
||||||
|
import org.apache.lucene.util.OfflineSorter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This wrapper buffers incoming elements and makes sure they are sorted based on given comparator.
|
* This wrapper buffers incoming elements and makes sure they are sorted based on given comparator.
|
||||||
|
@ -176,9 +177,7 @@ public class SortedInputIterator implements InputIterator {
|
||||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, tieBreakByCostComparator);
|
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, tieBreakByCostComparator);
|
||||||
tempInput = tempDir.createTempOutput(tempFileNamePrefix, "input", IOContext.DEFAULT);
|
tempInput = tempDir.createTempOutput(tempFileNamePrefix, "input", IOContext.DEFAULT);
|
||||||
|
|
||||||
final OfflineSorter.ByteSequencesWriter writer = new OfflineSorter.ByteSequencesWriter(tempInput);
|
try (OfflineSorter.ByteSequencesWriter writer = new OfflineSorter.ByteSequencesWriter(tempInput)) {
|
||||||
boolean success = false;
|
|
||||||
try {
|
|
||||||
BytesRef spare;
|
BytesRef spare;
|
||||||
byte[] buffer = new byte[0];
|
byte[] buffer = new byte[0];
|
||||||
ByteArrayDataOutput output = new ByteArrayDataOutput(buffer);
|
ByteArrayDataOutput output = new ByteArrayDataOutput(buffer);
|
||||||
|
@ -186,23 +185,11 @@ public class SortedInputIterator implements InputIterator {
|
||||||
while ((spare = source.next()) != null) {
|
while ((spare = source.next()) != null) {
|
||||||
encode(writer, output, buffer, spare, source.payload(), source.contexts(), source.weight());
|
encode(writer, output, buffer, spare, source.payload(), source.contexts(), source.weight());
|
||||||
}
|
}
|
||||||
writer.close();
|
CodecUtil.writeFooter(tempInput);
|
||||||
tempSortedFileName = sorter.sort(tempInput.getName());
|
|
||||||
ByteSequencesReader reader = new OfflineSorter.ByteSequencesReader(tempDir.openInput(tempSortedFileName, IOContext.READONCE));
|
|
||||||
success = true;
|
|
||||||
return reader;
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
if (success) {
|
|
||||||
IOUtils.close(writer);
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
IOUtils.closeWhileHandlingException(writer);
|
|
||||||
} finally {
|
|
||||||
close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tempSortedFileName = sorter.sort(tempInput.getName());
|
||||||
|
return new OfflineSorter.ByteSequencesReader(tempDir.openChecksumInput(tempSortedFileName, IOContext.READONCE), tempSortedFileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void close() throws IOException {
|
private void close() throws IOException {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
||||||
import org.apache.lucene.analysis.Analyzer;
|
import org.apache.lucene.analysis.Analyzer;
|
||||||
import org.apache.lucene.analysis.TokenStream;
|
import org.apache.lucene.analysis.TokenStream;
|
||||||
import org.apache.lucene.analysis.TokenStreamToAutomaton;
|
import org.apache.lucene.analysis.TokenStreamToAutomaton;
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.search.suggest.InputIterator;
|
import org.apache.lucene.search.suggest.InputIterator;
|
||||||
import org.apache.lucene.search.suggest.Lookup;
|
import org.apache.lucene.search.suggest.Lookup;
|
||||||
import org.apache.lucene.store.ByteArrayDataInput;
|
import org.apache.lucene.store.ByteArrayDataInput;
|
||||||
|
@ -53,14 +54,14 @@ import org.apache.lucene.util.automaton.Operations;
|
||||||
import org.apache.lucene.util.automaton.Transition;
|
import org.apache.lucene.util.automaton.Transition;
|
||||||
import org.apache.lucene.util.fst.Builder;
|
import org.apache.lucene.util.fst.Builder;
|
||||||
import org.apache.lucene.util.fst.ByteSequenceOutputs;
|
import org.apache.lucene.util.fst.ByteSequenceOutputs;
|
||||||
import org.apache.lucene.util.fst.FST;
|
|
||||||
import org.apache.lucene.util.fst.FST.BytesReader;
|
import org.apache.lucene.util.fst.FST.BytesReader;
|
||||||
import org.apache.lucene.util.fst.PairOutputs;
|
import org.apache.lucene.util.fst.FST;
|
||||||
import org.apache.lucene.util.fst.PairOutputs.Pair;
|
import org.apache.lucene.util.fst.PairOutputs.Pair;
|
||||||
|
import org.apache.lucene.util.fst.PairOutputs;
|
||||||
import org.apache.lucene.util.fst.PositiveIntOutputs;
|
import org.apache.lucene.util.fst.PositiveIntOutputs;
|
||||||
import org.apache.lucene.util.fst.Util;
|
|
||||||
import org.apache.lucene.util.fst.Util.Result;
|
import org.apache.lucene.util.fst.Util.Result;
|
||||||
import org.apache.lucene.util.fst.Util.TopResults;
|
import org.apache.lucene.util.fst.Util.TopResults;
|
||||||
|
import org.apache.lucene.util.fst.Util;
|
||||||
|
|
||||||
import static org.apache.lucene.util.automaton.Operations.DEFAULT_MAX_DETERMINIZED_STATES;
|
import static org.apache.lucene.util.automaton.Operations.DEFAULT_MAX_DETERMINIZED_STATES;
|
||||||
|
|
||||||
|
@ -480,6 +481,7 @@ public class AnalyzingSuggester extends Lookup implements Accountable {
|
||||||
|
|
||||||
maxAnalyzedPathsForOneInput = Math.max(maxAnalyzedPathsForOneInput, finiteStrings.size());
|
maxAnalyzedPathsForOneInput = Math.max(maxAnalyzedPathsForOneInput, finiteStrings.size());
|
||||||
}
|
}
|
||||||
|
CodecUtil.writeFooter(tempInput);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
// Sort all input/output pairs (required by FST.Builder):
|
// Sort all input/output pairs (required by FST.Builder):
|
||||||
|
@ -488,7 +490,7 @@ public class AnalyzingSuggester extends Lookup implements Accountable {
|
||||||
// Free disk space:
|
// Free disk space:
|
||||||
tempDir.deleteFile(tempInput.getName());
|
tempDir.deleteFile(tempInput.getName());
|
||||||
|
|
||||||
reader = new OfflineSorter.ByteSequencesReader(tempDir.openInput(tempSortedFileName, IOContext.READONCE));
|
reader = new OfflineSorter.ByteSequencesReader(tempDir.openChecksumInput(tempSortedFileName, IOContext.READONCE), tempSortedFileName);
|
||||||
|
|
||||||
PairOutputs<Long,BytesRef> outputs = new PairOutputs<>(PositiveIntOutputs.getSingleton(), ByteSequenceOutputs.getSingleton());
|
PairOutputs<Long,BytesRef> outputs = new PairOutputs<>(PositiveIntOutputs.getSingleton(), ByteSequenceOutputs.getSingleton());
|
||||||
Builder<Pair<Long,BytesRef>> builder = new Builder<>(FST.INPUT_TYPE.BYTE1, outputs);
|
Builder<Pair<Long,BytesRef>> builder = new Builder<>(FST.INPUT_TYPE.BYTE1, outputs);
|
||||||
|
|
|
@ -18,9 +18,9 @@ package org.apache.lucene.search.suggest.fst;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
@ -77,11 +77,12 @@ public class ExternalRefSorter implements BytesRefSorter, Closeable {
|
||||||
input = null;
|
input = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ByteSequenceIterator(new OfflineSorter.ByteSequencesReader(sorter.getDirectory().openInput(sortedFileName, IOContext.READONCE)));
|
return new ByteSequenceIterator(new OfflineSorter.ByteSequencesReader(sorter.getDirectory().openChecksumInput(sortedFileName, IOContext.READONCE), sortedFileName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeWriter() throws IOException {
|
private void closeWriter() throws IOException {
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
|
CodecUtil.writeFooter(input);
|
||||||
writer.close();
|
writer.close();
|
||||||
writer = null;
|
writer = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.search.suggest.InputIterator;
|
import org.apache.lucene.search.suggest.InputIterator;
|
||||||
import org.apache.lucene.search.suggest.Lookup;
|
import org.apache.lucene.search.suggest.Lookup;
|
||||||
import org.apache.lucene.search.suggest.fst.FSTCompletion.Completion;
|
import org.apache.lucene.search.suggest.fst.FSTCompletion.Completion;
|
||||||
|
@ -198,6 +199,7 @@ public class FSTCompletionLookup extends Lookup implements Accountable {
|
||||||
writer.write(buffer, 0, output.getPosition());
|
writer.write(buffer, 0, output.getPosition());
|
||||||
inputLineCount++;
|
inputLineCount++;
|
||||||
}
|
}
|
||||||
|
CodecUtil.writeFooter(tempInput);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
// We don't know the distribution of scores and we need to bucket them, so we'll sort
|
// We don't know the distribution of scores and we need to bucket them, so we'll sort
|
||||||
|
@ -208,7 +210,7 @@ public class FSTCompletionLookup extends Lookup implements Accountable {
|
||||||
FSTCompletionBuilder builder = new FSTCompletionBuilder(
|
FSTCompletionBuilder builder = new FSTCompletionBuilder(
|
||||||
buckets, externalSorter, sharedTailLength);
|
buckets, externalSorter, sharedTailLength);
|
||||||
|
|
||||||
reader = new OfflineSorter.ByteSequencesReader(tempDir.openInput(tempSortedFileName, IOContext.READONCE));
|
reader = new OfflineSorter.ByteSequencesReader(tempDir.openChecksumInput(tempSortedFileName, IOContext.READONCE), tempSortedFileName);
|
||||||
long line = 0;
|
long line = 0;
|
||||||
int previousBucket = 0;
|
int previousBucket = 0;
|
||||||
int previousScore = 0;
|
int previousScore = 0;
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.lucene.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/** Corrupts on bit of a file after close */
|
||||||
|
public class CorruptingIndexOutput extends IndexOutput {
|
||||||
|
protected final IndexOutput out;
|
||||||
|
final Directory dir;
|
||||||
|
final long byteToCorrupt;
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
|
public CorruptingIndexOutput(Directory dir, long byteToCorrupt, IndexOutput out) {
|
||||||
|
super("CorruptingIndexOutput(" + out + ")", out.getName());
|
||||||
|
this.dir = dir;
|
||||||
|
this.byteToCorrupt = byteToCorrupt;
|
||||||
|
this.out = out;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return out.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (closed == false) {
|
||||||
|
out.close();
|
||||||
|
// NOTE: must corrupt after file is closed, because if we corrupt "inlined" (as bytes are being written) the checksum sees the wrong
|
||||||
|
// bytes and is "correct"!!
|
||||||
|
corruptFile();
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void corruptFile() throws IOException {
|
||||||
|
// Now corrupt the specfied byte:
|
||||||
|
String newTempName;
|
||||||
|
try(IndexOutput tmpOut = dir.createTempOutput("tmp", "tmp", IOContext.DEFAULT);
|
||||||
|
IndexInput in = dir.openInput(out.getName(), IOContext.DEFAULT)) {
|
||||||
|
newTempName = tmpOut.getName();
|
||||||
|
|
||||||
|
if (byteToCorrupt >= in.length()) {
|
||||||
|
throw new IllegalArgumentException("byteToCorrupt=" + byteToCorrupt + " but file \"" + out.getName() + "\" is only length=" + in.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpOut.copyBytes(in, byteToCorrupt);
|
||||||
|
// Flip the 0th bit:
|
||||||
|
tmpOut.writeByte((byte) (in.readByte() ^ 1));
|
||||||
|
tmpOut.copyBytes(in, in.length()-byteToCorrupt-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete original and copy corrupt version back:
|
||||||
|
dir.deleteFile(out.getName());
|
||||||
|
dir.copyFrom(dir, newTempName, out.getName(), IOContext.DEFAULT);
|
||||||
|
dir.deleteFile(newTempName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getFilePointer() {
|
||||||
|
return out.getFilePointer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getChecksum() throws IOException {
|
||||||
|
return out.getChecksum() ^ 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CorruptingIndexOutput(" + out + ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeByte(byte b) throws IOException {
|
||||||
|
out.writeByte(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeBytes(byte[] b, int offset, int length) throws IOException {
|
||||||
|
for(int i=0;i<length;i++) {
|
||||||
|
writeByte(b[offset+i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue