mirror of https://github.com/apache/lucene.git
LUCENE-8623: Decrease I/O pressure when merging high dimensional points
This commit is contained in:
parent
570d573c00
commit
8762b071bb
|
@ -321,6 +321,8 @@ Optimizations
|
||||||
rather than an in-place mergesort, which needs to perform fewer swaps.
|
rather than an in-place mergesort, which needs to perform fewer swaps.
|
||||||
(Adrien Grand)
|
(Adrien Grand)
|
||||||
|
|
||||||
|
* LUCENE-8623: Decrease I/O pressure when merging high dimensional points. (Ignacio Vera)
|
||||||
|
|
||||||
Test Framework
|
Test Framework
|
||||||
|
|
||||||
* LUCENE-8604: TestRuleLimitSysouts now has an optional "hard limit" of bytes that can be written
|
* LUCENE-8604: TestRuleLimitSysouts now has an optional "hard limit" of bytes that can be written
|
||||||
|
|
|
@ -761,11 +761,6 @@ public class BKDWriter implements Closeable {
|
||||||
// TODO: if we fixed each partition step to just record the file offset at the "split point", we could probably handle variable length
|
// TODO: if we fixed each partition step to just record the file offset at the "split point", we could probably handle variable length
|
||||||
// encoding and not have our own ByteSequencesReader/Writer
|
// encoding and not have our own ByteSequencesReader/Writer
|
||||||
|
|
||||||
/** Sort the heap writer by the specified dim */
|
|
||||||
private void sortHeapPointWriter(final HeapPointWriter writer, int dim) {
|
|
||||||
final int pointCount = Math.toIntExact(this.pointCount);
|
|
||||||
sortHeapPointWriter(writer, pointCount, dim);
|
|
||||||
}
|
|
||||||
/** Sort the heap writer by the specified dim */
|
/** Sort the heap writer by the specified dim */
|
||||||
private void sortHeapPointWriter(final HeapPointWriter writer, int pointCount, int dim) {
|
private void sortHeapPointWriter(final HeapPointWriter writer, int pointCount, int dim) {
|
||||||
// Tie-break by docID:
|
// Tie-break by docID:
|
||||||
|
@ -841,36 +836,47 @@ public class BKDWriter implements Closeable {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
//return a new point writer sort by the provided dimension from input data
|
||||||
private PointWriter sort(int dim) throws IOException {
|
private PointWriter sort(int dim) throws IOException {
|
||||||
assert dim >= 0 && dim < numDataDims;
|
assert dim >= 0 && dim < numDataDims;
|
||||||
|
|
||||||
if (heapPointWriter != null) {
|
if (heapPointWriter != null) {
|
||||||
|
|
||||||
assert tempInput == null;
|
assert tempInput == null;
|
||||||
|
|
||||||
// We never spilled the incoming points to disk, so now we sort in heap:
|
// We never spilled the incoming points to disk, so now we sort in heap:
|
||||||
HeapPointWriter sorted;
|
HeapPointWriter sorted = heapPointWriter;
|
||||||
|
|
||||||
if (dim == 0) {
|
|
||||||
// First dim can re-use the current heap writer
|
|
||||||
sorted = heapPointWriter;
|
|
||||||
} else {
|
|
||||||
// Subsequent dims need a private copy
|
|
||||||
sorted = new HeapPointWriter((int) pointCount, (int) pointCount, packedBytesLength, longOrds, singleValuePerDoc);
|
|
||||||
sorted.copyFrom(heapPointWriter);
|
|
||||||
}
|
|
||||||
|
|
||||||
//long t0 = System.nanoTime();
|
//long t0 = System.nanoTime();
|
||||||
sortHeapPointWriter(sorted, dim);
|
sortHeapPointWriter(sorted, Math.toIntExact(this.pointCount), dim);
|
||||||
//long t1 = System.nanoTime();
|
//long t1 = System.nanoTime();
|
||||||
//System.out.println("BKD: sort took " + ((t1-t0)/1000000.0) + " msec");
|
//System.out.println("BKD: sort took " + ((t1-t0)/1000000.0) + " msec");
|
||||||
|
|
||||||
sorted.close();
|
sorted.close();
|
||||||
|
heapPointWriter = null;
|
||||||
return sorted;
|
return sorted;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// Offline sort:
|
// Offline sort:
|
||||||
assert tempInput != null;
|
assert tempInput != null;
|
||||||
|
OfflinePointWriter sorted = sortOffLine(dim, tempInput.getName(), 0, pointCount);
|
||||||
|
tempDir.deleteFile(tempInput.getName());
|
||||||
|
tempInput = null;
|
||||||
|
return sorted;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//return a new point writer sort by the provided dimension from start to start + pointCount
|
||||||
|
private PointWriter sort(int dim, PointWriter writer, final long start, final long pointCount) throws IOException {
|
||||||
|
assert dim >= 0 && dim < numDataDims;
|
||||||
|
|
||||||
|
if (writer instanceof HeapPointWriter) {
|
||||||
|
HeapPointWriter heapPointWriter = createHeapPointWriterCopy((HeapPointWriter) writer, start, pointCount);
|
||||||
|
sortHeapPointWriter(heapPointWriter, Math.toIntExact(pointCount), dim);
|
||||||
|
return heapPointWriter;
|
||||||
|
} else {
|
||||||
|
OfflinePointWriter offlinePointWriter = (OfflinePointWriter) writer;
|
||||||
|
return sortOffLine(dim, offlinePointWriter.name, start, pointCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort a given file on a given dimension for start to start + point count
|
||||||
|
private OfflinePointWriter sortOffLine(int dim, String inputName, final long start, final long pointCount) throws IOException {
|
||||||
|
|
||||||
final int offset = bytesPerDim * dim;
|
final int offset = bytesPerDim * dim;
|
||||||
|
|
||||||
|
@ -898,28 +904,39 @@ public class BKDWriter implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
|
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
|
||||||
|
/**
|
||||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
* We write/read fixed-byte-width file that {@link OfflinePointReader} can read.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
|
protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
|
||||||
return new ByteSequencesWriter(out) {
|
return new ByteSequencesWriter(out) {
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] bytes, int off, int len) throws IOException {
|
public void write(byte[] bytes, int off, int len) throws IOException {
|
||||||
assert len == bytesPerDoc: "len=" + len + " bytesPerDoc=" + bytesPerDoc;
|
assert len == bytesPerDoc : "len=" + len + " bytesPerDoc=" + bytesPerDoc;
|
||||||
out.writeBytes(bytes, off, len);
|
out.writeBytes(bytes, off, len);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
/**
|
||||||
|
* We write/read fixed-byte-width file that {@link OfflinePointReader} can read.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
|
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
|
||||||
|
//This allows to read only a subset of the original file
|
||||||
|
long startPointer = (name.equals(inputName)) ? bytesPerDoc * start : in.getFilePointer();
|
||||||
|
long endPointer = (name.equals(inputName)) ? startPointer + bytesPerDoc * pointCount : Long.MAX_VALUE;
|
||||||
|
in.seek(startPointer);
|
||||||
return new ByteSequencesReader(in, name) {
|
return new ByteSequencesReader(in, name) {
|
||||||
final BytesRef scratch = new BytesRef(new byte[bytesPerDoc]);
|
final BytesRef scratch = new BytesRef(new byte[bytesPerDoc]);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BytesRef next() throws IOException {
|
public BytesRef next() throws IOException {
|
||||||
if (in.getFilePointer() >= end) {
|
if (in.getFilePointer() >= end) {
|
||||||
return null;
|
return null;
|
||||||
|
} else if (in.getFilePointer() >= endPointer) {
|
||||||
|
in.seek(end);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
in.readBytes(scratch.bytes, 0, bytesPerDoc);
|
in.readBytes(scratch.bytes, 0, bytesPerDoc);
|
||||||
return scratch;
|
return scratch;
|
||||||
|
@ -928,10 +945,23 @@ public class BKDWriter implements Closeable {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
String name = sorter.sort(tempInput.getName());
|
String name = sorter.sort(inputName);
|
||||||
|
|
||||||
return new OfflinePointWriter(tempDir, name, packedBytesLength, pointCount, longOrds, singleValuePerDoc);
|
return new OfflinePointWriter(tempDir, name, packedBytesLength, pointCount, longOrds, singleValuePerDoc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HeapPointWriter createHeapPointWriterCopy(HeapPointWriter writer, long start, long count) throws IOException {
|
||||||
|
//TODO: Can we do this faster?
|
||||||
|
int size = Math.toIntExact(count);
|
||||||
|
try (HeapPointWriter pointWriter = new HeapPointWriter(size, size, packedBytesLength, longOrds, singleValuePerDoc);
|
||||||
|
PointReader reader = writer.getReader(start, count)) {
|
||||||
|
for (long i =0; i < count; i++) {
|
||||||
|
reader.next();
|
||||||
|
pointWriter.append(reader.packedValue(), reader.ord(), reader.docID());
|
||||||
|
}
|
||||||
|
return pointWriter;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw verifyChecksum(t, writer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkMaxLeafNodeCount(int numLeaves) {
|
private void checkMaxLeafNodeCount(int numLeaves) {
|
||||||
|
@ -994,7 +1024,7 @@ public class BKDWriter implements Closeable {
|
||||||
// Make sure the math above "worked":
|
// Make sure the math above "worked":
|
||||||
assert pointCount / numLeaves <= maxPointsInLeafNode: "pointCount=" + pointCount + " numLeaves=" + numLeaves + " maxPointsInLeafNode=" + maxPointsInLeafNode;
|
assert pointCount / numLeaves <= maxPointsInLeafNode: "pointCount=" + pointCount + " numLeaves=" + numLeaves + " maxPointsInLeafNode=" + maxPointsInLeafNode;
|
||||||
|
|
||||||
// Sort all docs once by each dimension:
|
// Slices are created as they are needed
|
||||||
PathSlice[] sortedPointWriters = new PathSlice[numIndexDims];
|
PathSlice[] sortedPointWriters = new PathSlice[numIndexDims];
|
||||||
|
|
||||||
// This is only used on exception; on normal code paths we close all files we opened:
|
// This is only used on exception; on normal code paths we close all files we opened:
|
||||||
|
@ -1002,20 +1032,6 @@ public class BKDWriter implements Closeable {
|
||||||
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
//long t0 = System.nanoTime();
|
|
||||||
for(int dim=0;dim<numIndexDims;dim++) {
|
|
||||||
sortedPointWriters[dim] = new PathSlice(sort(dim), 0, pointCount);
|
|
||||||
}
|
|
||||||
//long t1 = System.nanoTime();
|
|
||||||
//System.out.println("sort time: " + ((t1-t0)/1000000.0) + " msec");
|
|
||||||
|
|
||||||
if (tempInput != null) {
|
|
||||||
tempDir.deleteFile(tempInput.getName());
|
|
||||||
tempInput = null;
|
|
||||||
} else {
|
|
||||||
assert heapPointWriter != null;
|
|
||||||
heapPointWriter = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
final int[] parentSplits = new int[numIndexDims];
|
final int[] parentSplits = new int[numIndexDims];
|
||||||
build(1, numLeaves, sortedPointWriters,
|
build(1, numLeaves, sortedPointWriters,
|
||||||
|
@ -1027,10 +1043,6 @@ public class BKDWriter implements Closeable {
|
||||||
toCloseHeroically);
|
toCloseHeroically);
|
||||||
assert Arrays.equals(parentSplits, new int[numIndexDims]);
|
assert Arrays.equals(parentSplits, new int[numIndexDims]);
|
||||||
|
|
||||||
for(PathSlice slice : sortedPointWriters) {
|
|
||||||
slice.writer.destroy();
|
|
||||||
}
|
|
||||||
|
|
||||||
// If no exception, we should have cleaned everything up:
|
// If no exception, we should have cleaned everything up:
|
||||||
assert tempDir.getCreatedFiles().isEmpty();
|
assert tempDir.getCreatedFiles().isEmpty();
|
||||||
//long t2 = System.nanoTime();
|
//long t2 = System.nanoTime();
|
||||||
|
@ -1443,7 +1455,7 @@ public class BKDWriter implements Closeable {
|
||||||
boolean result = reader.next();
|
boolean result = reader.next();
|
||||||
assert result: "rightCount=" + rightCount + " source.count=" + source.count + " source.writer=" + source.writer;
|
assert result: "rightCount=" + rightCount + " source.count=" + source.count + " source.writer=" + source.writer;
|
||||||
System.arraycopy(reader.packedValue(), splitDim*bytesPerDim, scratch1, 0, bytesPerDim);
|
System.arraycopy(reader.packedValue(), splitDim*bytesPerDim, scratch1, 0, bytesPerDim);
|
||||||
if (numIndexDims > 1) {
|
if (numIndexDims > 1 && ordBitSet != null) {
|
||||||
assert ordBitSet.get(reader.ord()) == false;
|
assert ordBitSet.get(reader.ord()) == false;
|
||||||
ordBitSet.set(reader.ord());
|
ordBitSet.set(reader.ord());
|
||||||
// Subtract 1 from rightCount because we already did the first value above (so we could record the split value):
|
// Subtract 1 from rightCount because we already did the first value above (so we could record the split value):
|
||||||
|
@ -1677,15 +1689,6 @@ public class BKDWriter implements Closeable {
|
||||||
long[] leafBlockFPs,
|
long[] leafBlockFPs,
|
||||||
List<Closeable> toCloseHeroically) throws IOException {
|
List<Closeable> toCloseHeroically) throws IOException {
|
||||||
|
|
||||||
for (PathSlice slice : slices) {
|
|
||||||
assert slice.count == slices[0].count;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numDataDims == 1 && slices[0].writer instanceof OfflinePointWriter && slices[0].count <= maxPointsSortInHeap) {
|
|
||||||
// Special case for 1D, to cutover to heap once we recurse deeply enough:
|
|
||||||
slices[0] = switchToHeap(slices[0], toCloseHeroically);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nodeID >= leafNodeOffset) {
|
if (nodeID >= leafNodeOffset) {
|
||||||
|
|
||||||
// Leaf node: write block
|
// Leaf node: write block
|
||||||
|
@ -1695,10 +1698,20 @@ public class BKDWriter implements Closeable {
|
||||||
int sortedDimCardinality = Integer.MAX_VALUE;
|
int sortedDimCardinality = Integer.MAX_VALUE;
|
||||||
|
|
||||||
for (int dim=0;dim<numIndexDims;dim++) {
|
for (int dim=0;dim<numIndexDims;dim++) {
|
||||||
|
//create a slice if it does not exist
|
||||||
|
boolean created = false;
|
||||||
|
if (slices[dim] == null) {
|
||||||
|
createPathSlice(slices, dim);
|
||||||
|
created = true;
|
||||||
|
}
|
||||||
if (slices[dim].writer instanceof HeapPointWriter == false) {
|
if (slices[dim].writer instanceof HeapPointWriter == false) {
|
||||||
// Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started
|
// Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started
|
||||||
// offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer
|
// offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer
|
||||||
|
PathSlice slice = slices[dim];
|
||||||
slices[dim] = switchToHeap(slices[dim], toCloseHeroically);
|
slices[dim] = switchToHeap(slices[dim], toCloseHeroically);
|
||||||
|
if (created) {
|
||||||
|
slice.writer.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PathSlice source = slices[dim];
|
PathSlice source = slices[dim];
|
||||||
|
@ -1819,6 +1832,12 @@ public class BKDWriter implements Closeable {
|
||||||
splitDim = 0;
|
splitDim = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//We delete the created path slices at the same level
|
||||||
|
boolean deleteSplitDim = false;
|
||||||
|
if (slices[splitDim] == null) {
|
||||||
|
createPathSlice(slices, splitDim);
|
||||||
|
deleteSplitDim = true;
|
||||||
|
}
|
||||||
PathSlice source = slices[splitDim];
|
PathSlice source = slices[splitDim];
|
||||||
|
|
||||||
assert nodeID < splitPackedValues.length: "nodeID=" + nodeID + " splitValues.length=" + splitPackedValues.length;
|
assert nodeID < splitPackedValues.length: "nodeID=" + nodeID + " splitValues.length=" + splitPackedValues.length;
|
||||||
|
@ -1827,7 +1846,16 @@ public class BKDWriter implements Closeable {
|
||||||
long rightCount = source.count / 2;
|
long rightCount = source.count / 2;
|
||||||
long leftCount = source.count - rightCount;
|
long leftCount = source.count - rightCount;
|
||||||
|
|
||||||
byte[] splitValue = markRightTree(rightCount, splitDim, source, ordBitSet);
|
// When we are on this dim, below, we clear the ordBitSet:
|
||||||
|
int dimToClear = numIndexDims - 1;
|
||||||
|
while (dimToClear >= 0) {
|
||||||
|
if (slices[dimToClear] != null && splitDim != dimToClear) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
dimToClear--;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] splitValue = (dimToClear == -1) ? markRightTree(rightCount, splitDim, source, null) : markRightTree(rightCount, splitDim, source, ordBitSet);
|
||||||
int address = nodeID * (1+bytesPerDim);
|
int address = nodeID * (1+bytesPerDim);
|
||||||
splitPackedValues[address] = (byte) splitDim;
|
splitPackedValues[address] = (byte) splitDim;
|
||||||
System.arraycopy(splitValue, 0, splitPackedValues, address + 1, bytesPerDim);
|
System.arraycopy(splitValue, 0, splitPackedValues, address + 1, bytesPerDim);
|
||||||
|
@ -1843,16 +1871,11 @@ public class BKDWriter implements Closeable {
|
||||||
byte[] maxSplitPackedValue = new byte[packedIndexBytesLength];
|
byte[] maxSplitPackedValue = new byte[packedIndexBytesLength];
|
||||||
System.arraycopy(maxPackedValue, 0, maxSplitPackedValue, 0, packedIndexBytesLength);
|
System.arraycopy(maxPackedValue, 0, maxSplitPackedValue, 0, packedIndexBytesLength);
|
||||||
|
|
||||||
// When we are on this dim, below, we clear the ordBitSet:
|
|
||||||
int dimToClear;
|
|
||||||
if (numIndexDims - 1 == splitDim) {
|
|
||||||
dimToClear = numIndexDims - 2;
|
|
||||||
} else {
|
|
||||||
dimToClear = numIndexDims - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int dim=0;dim<numIndexDims;dim++) {
|
for(int dim=0;dim<numIndexDims;dim++) {
|
||||||
|
if (slices[dim] == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (dim == splitDim) {
|
if (dim == splitDim) {
|
||||||
// No need to partition on this dim since it's a simple slice of the incoming already sorted slice, and we
|
// No need to partition on this dim since it's a simple slice of the incoming already sorted slice, and we
|
||||||
// will re-use its shared reader when visiting it as we recurse:
|
// will re-use its shared reader when visiting it as we recurse:
|
||||||
|
@ -1890,7 +1913,7 @@ public class BKDWriter implements Closeable {
|
||||||
splitPackedValues, leafBlockFPs, toCloseHeroically);
|
splitPackedValues, leafBlockFPs, toCloseHeroically);
|
||||||
for(int dim=0;dim<numIndexDims;dim++) {
|
for(int dim=0;dim<numIndexDims;dim++) {
|
||||||
// Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
|
// Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
|
||||||
if (dim != splitDim) {
|
if (dim != splitDim && slices[dim] != null) {
|
||||||
leftSlices[dim].writer.destroy();
|
leftSlices[dim].writer.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1903,11 +1926,30 @@ public class BKDWriter implements Closeable {
|
||||||
splitPackedValues, leafBlockFPs, toCloseHeroically);
|
splitPackedValues, leafBlockFPs, toCloseHeroically);
|
||||||
for(int dim=0;dim<numIndexDims;dim++) {
|
for(int dim=0;dim<numIndexDims;dim++) {
|
||||||
// Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
|
// Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
|
||||||
if (dim != splitDim) {
|
if (dim != splitDim && slices[dim] != null) {
|
||||||
rightSlices[dim].writer.destroy();
|
rightSlices[dim].writer.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
parentSplits[splitDim]--;
|
parentSplits[splitDim]--;
|
||||||
|
if (deleteSplitDim) {
|
||||||
|
slices[splitDim].writer.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createPathSlice(PathSlice[] slices, int dim) throws IOException{
|
||||||
|
assert slices[dim] == null;
|
||||||
|
PathSlice current = null;
|
||||||
|
for(PathSlice slice : slices) {
|
||||||
|
if (slice != null) {
|
||||||
|
current = slice;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (current == null) {
|
||||||
|
slices[dim] = new PathSlice(sort(dim), 0, pointCount);
|
||||||
|
} else {
|
||||||
|
slices[dim] = new PathSlice(sort(dim, current.writer, current.start, current.count), 0, current.count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue