mirror of https://github.com/apache/lucene.git
LUCENE-7122: handle fixed length byte[] values more efficiently in OfflineSorter
This commit is contained in:
parent
045659533c
commit
c47a2996b5
|
@ -19,14 +19,6 @@ package org.apache.lucene.util;
|
|||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.apache.lucene.util.IntroSorter;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
/**
|
||||
* A simple append only random-access {@link BytesRef} array that stores full
|
||||
* copies of the appended bytes in a {@link ByteBlockPool}.
|
||||
|
@ -37,7 +29,7 @@ import org.apache.lucene.util.RamUsageEstimator;
|
|||
* @lucene.internal
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public final class BytesRefArray {
|
||||
public final class BytesRefArray implements SortableBytesRefArray {
|
||||
private final ByteBlockPool pool;
|
||||
private int[] offsets = new int[1];
|
||||
private int lastElement = 0;
|
||||
|
@ -58,9 +50,11 @@ public final class BytesRefArray {
|
|||
/**
|
||||
* Clears this {@link BytesRefArray}
|
||||
*/
|
||||
@Override
|
||||
public void clear() {
|
||||
lastElement = 0;
|
||||
currentOffset = 0;
|
||||
// TODO: it's trappy that this does not return storage held by int[] offsets array!
|
||||
Arrays.fill(offsets, 0);
|
||||
pool.reset(false, true); // no need to 0 fill the buffers we control the allocator
|
||||
}
|
||||
|
@ -70,6 +64,7 @@ public final class BytesRefArray {
|
|||
* @param bytes the bytes to append
|
||||
* @return the index of the appended bytes
|
||||
*/
|
||||
@Override
|
||||
public int append(BytesRef bytes) {
|
||||
if (lastElement >= offsets.length) {
|
||||
int oldLen = offsets.length;
|
||||
|
@ -86,6 +81,7 @@ public final class BytesRefArray {
|
|||
* Returns the current size of this {@link BytesRefArray}
|
||||
* @return the current size of this {@link BytesRefArray}
|
||||
*/
|
||||
@Override
|
||||
public int size() {
|
||||
return lastElement;
|
||||
}
|
||||
|
@ -192,6 +188,7 @@ public final class BytesRefArray {
|
|||
* This is a non-destructive operation.
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public BytesRefIterator iterator(final Comparator<BytesRef> comp) {
|
||||
final BytesRefBuilder spare = new BytesRefBuilder();
|
||||
final BytesRef result = new BytesRef();
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.util;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
* Just like {@link BytesRefArray} except all values have the same length.
|
||||
*
|
||||
* <b>Note: This class is not Thread-Safe!</b>
|
||||
*
|
||||
* @lucene.internal
|
||||
* @lucene.experimental
|
||||
*/
|
||||
final class FixedLengthBytesRefArray implements SortableBytesRefArray {
|
||||
private final int valueLength;
|
||||
private final int valuesPerBlock;
|
||||
|
||||
/** How many values have been appended */
|
||||
private int size;
|
||||
|
||||
/** How many blocks are used */
|
||||
private int currentBlock = -1;
|
||||
private int nextEntry;
|
||||
|
||||
private byte[][] blocks;
|
||||
|
||||
/**
|
||||
* Creates a new {@link BytesRefArray} with a counter to track allocated bytes
|
||||
*/
|
||||
public FixedLengthBytesRefArray(int valueLength) {
|
||||
this.valueLength = valueLength;
|
||||
|
||||
// ~32K per page, unless each value is > 32K:
|
||||
valuesPerBlock = Math.max(1, 32768/valueLength);
|
||||
nextEntry = valuesPerBlock;
|
||||
blocks = new byte[0][];
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears this {@link BytesRefArray}
|
||||
*/
|
||||
@Override
|
||||
public void clear() {
|
||||
size = 0;
|
||||
blocks = new byte[0][];
|
||||
currentBlock = -1;
|
||||
nextEntry = valuesPerBlock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends a copy of the given {@link BytesRef} to this {@link BytesRefArray}.
|
||||
* @param bytes the bytes to append
|
||||
* @return the index of the appended bytes
|
||||
*/
|
||||
@Override
|
||||
public int append(BytesRef bytes) {
|
||||
if (bytes.length != valueLength) {
|
||||
throw new IllegalArgumentException("value length is " + bytes.length + " but is supposed to always be " + valueLength);
|
||||
}
|
||||
if (nextEntry == valuesPerBlock) {
|
||||
currentBlock++;
|
||||
if (currentBlock == blocks.length) {
|
||||
int size = ArrayUtil.oversize(currentBlock+1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
|
||||
byte[][] next = new byte[size][];
|
||||
System.arraycopy(blocks, 0, next, 0, blocks.length);
|
||||
blocks = next;
|
||||
}
|
||||
blocks[currentBlock] = new byte[valuesPerBlock * valueLength];
|
||||
nextEntry = 0;
|
||||
}
|
||||
|
||||
System.arraycopy(bytes.bytes, bytes.offset, blocks[currentBlock], nextEntry*valueLength, valueLength);
|
||||
nextEntry++;
|
||||
|
||||
return size++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current size of this {@link FixedLengthBytesRefArray}
|
||||
* @return the current size of this {@link FixedLengthBytesRefArray}
|
||||
*/
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
private int[] sort(final Comparator<BytesRef> comp) {
|
||||
final int[] orderedEntries = new int[size()];
|
||||
for (int i = 0; i < orderedEntries.length; i++) {
|
||||
orderedEntries[i] = i;
|
||||
}
|
||||
|
||||
final BytesRef pivot = new BytesRef();
|
||||
final BytesRef scratch1 = new BytesRef();
|
||||
final BytesRef scratch2 = new BytesRef();
|
||||
pivot.length = valueLength;
|
||||
scratch1.length = valueLength;
|
||||
scratch2.length = valueLength;
|
||||
|
||||
new IntroSorter() {
|
||||
|
||||
@Override
|
||||
protected void swap(int i, int j) {
|
||||
int o = orderedEntries[i];
|
||||
orderedEntries[i] = orderedEntries[j];
|
||||
orderedEntries[j] = o;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int compare(int i, int j) {
|
||||
int index1 = orderedEntries[i];
|
||||
scratch1.bytes = blocks[index1 / valuesPerBlock];
|
||||
scratch1.offset = (index1 % valuesPerBlock) * valueLength;
|
||||
|
||||
int index2 = orderedEntries[j];
|
||||
scratch2.bytes = blocks[index2 / valuesPerBlock];
|
||||
scratch2.offset = (index2 % valuesPerBlock) * valueLength;
|
||||
|
||||
return comp.compare(scratch1, scratch2);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setPivot(int i) {
|
||||
int index = orderedEntries[i];
|
||||
pivot.bytes = blocks[index / valuesPerBlock];
|
||||
pivot.offset = (index % valuesPerBlock) * valueLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int comparePivot(int j) {
|
||||
final int index = orderedEntries[j];
|
||||
scratch2.bytes = blocks[index / valuesPerBlock];
|
||||
scratch2.offset = (index % valuesPerBlock) * valueLength;
|
||||
return comp.compare(pivot, scratch2);
|
||||
}
|
||||
}.sort(0, size());
|
||||
return orderedEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Returns a {@link BytesRefIterator} with point in time semantics. The
|
||||
* iterator provides access to all so far appended {@link BytesRef} instances.
|
||||
* </p>
|
||||
* <p>
|
||||
* The iterator will iterate the byte values in the order specified by the comparator.
|
||||
* </p>
|
||||
* <p>
|
||||
* This is a non-destructive operation.
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public BytesRefIterator iterator(final Comparator<BytesRef> comp) {
|
||||
final BytesRef result = new BytesRef();
|
||||
result.length = valueLength;
|
||||
final int size = size();
|
||||
final int[] indices = sort(comp);
|
||||
return new BytesRefIterator() {
|
||||
int pos = 0;
|
||||
|
||||
@Override
|
||||
public BytesRef next() {
|
||||
if (pos < size) {
|
||||
int index = indices[pos];
|
||||
pos++;
|
||||
result.bytes = blocks[index / valuesPerBlock];
|
||||
result.offset = (index % valuesPerBlock) * valueLength;
|
||||
return result;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -69,7 +69,7 @@ public class OfflineSorter {
|
|||
public final static int MAX_TEMPFILES = 10;
|
||||
|
||||
private final Directory dir;
|
||||
|
||||
private final int valueLength;
|
||||
private final String tempFileNamePrefix;
|
||||
|
||||
/**
|
||||
|
@ -170,7 +170,7 @@ public class OfflineSorter {
|
|||
private final BufferSize ramBufferSize;
|
||||
|
||||
private final Counter bufferBytesUsed = Counter.newCounter();
|
||||
private final BytesRefArray buffer = new BytesRefArray(bufferBytesUsed);
|
||||
private final SortableBytesRefArray buffer;
|
||||
SortInfo sortInfo;
|
||||
private int maxTempFiles;
|
||||
private final Comparator<BytesRef> comparator;
|
||||
|
@ -184,7 +184,7 @@ public class OfflineSorter {
|
|||
* @see BufferSize#automatic()
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix) throws IOException {
|
||||
this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES);
|
||||
this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -193,13 +193,14 @@ public class OfflineSorter {
|
|||
* @see BufferSize#automatic()
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator) throws IOException {
|
||||
this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES);
|
||||
this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* All-details constructor.
|
||||
* All-details constructor. If {@code valueLength} is -1 (the default), the length of each value differs; otherwise,
|
||||
* all values have the specified length.
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator, BufferSize ramBufferSize, int maxTempfiles) {
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator, BufferSize ramBufferSize, int maxTempfiles, int valueLength) {
|
||||
if (ramBufferSize.bytes < ABSOLUTE_MIN_SORT_BUFFER_SIZE) {
|
||||
throw new IllegalArgumentException(MIN_BUFFER_SIZE_MSG + ": " + ramBufferSize.bytes);
|
||||
}
|
||||
|
@ -207,7 +208,15 @@ public class OfflineSorter {
|
|||
if (maxTempfiles < 2) {
|
||||
throw new IllegalArgumentException("maxTempFiles must be >= 2");
|
||||
}
|
||||
|
||||
if (valueLength == -1) {
|
||||
buffer = new BytesRefArray(bufferBytesUsed);
|
||||
} else {
|
||||
if (valueLength == 0 || valueLength > Short.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength);
|
||||
}
|
||||
buffer = new FixedLengthBytesRefArray(valueLength);
|
||||
}
|
||||
this.valueLength = valueLength;
|
||||
this.ramBufferSize = ramBufferSize;
|
||||
this.maxTempFiles = maxTempfiles;
|
||||
this.comparator = comparator;
|
||||
|
@ -283,7 +292,7 @@ public class OfflineSorter {
|
|||
// We should be explicitly removing all intermediate files ourselves unless there is an exception:
|
||||
assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result);
|
||||
|
||||
sortInfo.totalTime = (System.currentTimeMillis() - sortInfo.totalTime);
|
||||
sortInfo.totalTime = System.currentTimeMillis() - sortInfo.totalTime;
|
||||
|
||||
CodecUtil.checkFooter(is.in);
|
||||
|
||||
|
@ -308,7 +317,7 @@ public class OfflineSorter {
|
|||
|
||||
long start = System.currentTimeMillis();
|
||||
BytesRefIterator iter = buffer.iterator(comparator);
|
||||
sortInfo.sortTime += (System.currentTimeMillis() - start);
|
||||
sortInfo.sortTime += System.currentTimeMillis() - start;
|
||||
|
||||
while ((spare = iter.next()) != null) {
|
||||
assert spare.length <= Short.MAX_VALUE;
|
||||
|
@ -414,24 +423,40 @@ public class OfflineSorter {
|
|||
/** Read in a single partition of data */
|
||||
int readPartition(ByteSequencesReader reader) throws IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
BytesRef item = null;
|
||||
try {
|
||||
item = reader.next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, reader);
|
||||
if (valueLength != -1) {
|
||||
int limit = ramBufferSize.bytes / valueLength;
|
||||
for(int i=0;i<limit;i++) {
|
||||
BytesRef item = null;
|
||||
try {
|
||||
item = reader.next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, reader);
|
||||
}
|
||||
if (item == null) {
|
||||
break;
|
||||
}
|
||||
buffer.append(item);
|
||||
}
|
||||
if (item == null) {
|
||||
break;
|
||||
}
|
||||
buffer.append(item);
|
||||
// Account for the created objects.
|
||||
// (buffer slots do not account to buffer size.)
|
||||
if (bufferBytesUsed.get() > ramBufferSize.bytes) {
|
||||
break;
|
||||
} else {
|
||||
while (true) {
|
||||
BytesRef item = null;
|
||||
try {
|
||||
item = reader.next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, reader);
|
||||
}
|
||||
if (item == null) {
|
||||
break;
|
||||
}
|
||||
buffer.append(item);
|
||||
// Account for the created objects.
|
||||
// (buffer slots do not account to buffer size.)
|
||||
if (bufferBytesUsed.get() > ramBufferSize.bytes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
sortInfo.readTime += (System.currentTimeMillis() - start);
|
||||
sortInfo.readTime += System.currentTimeMillis() - start;
|
||||
return buffer.size();
|
||||
}
|
||||
|
||||
|
@ -463,6 +488,8 @@ public class OfflineSorter {
|
|||
public static class ByteSequencesWriter implements Closeable {
|
||||
protected final IndexOutput out;
|
||||
|
||||
// TODO: this should optimize the fixed width case as well
|
||||
|
||||
/** Constructs a ByteSequencesWriter to the provided DataOutput */
|
||||
public ByteSequencesWriter(IndexOutput out) {
|
||||
this.out = out;
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.util;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
interface SortableBytesRefArray {
|
||||
/** Append a new value */
|
||||
int append(BytesRef bytes);
|
||||
/** Clear all previously stored values */
|
||||
void clear();
|
||||
/** Returns the number of values appended so far */
|
||||
int size();
|
||||
/** Sort all values by the provided comparator and return an iterator over the sorted values */
|
||||
BytesRefIterator iterator(final Comparator<BytesRef> comp);
|
||||
}
|
|
@ -722,6 +722,8 @@ public class BKDWriter implements Closeable {
|
|||
// Offline sort:
|
||||
assert tempInput != null;
|
||||
|
||||
final int offset = bytesPerDim * dim;
|
||||
|
||||
Comparator<BytesRef> cmp = new Comparator<BytesRef>() {
|
||||
|
||||
final ByteArrayDataInput reader = new ByteArrayDataInput();
|
||||
|
@ -729,7 +731,7 @@ public class BKDWriter implements Closeable {
|
|||
@Override
|
||||
public int compare(BytesRef a, BytesRef b) {
|
||||
// First compare by the requested dimension we are sorting by:
|
||||
int cmp = StringHelper.compare(bytesPerDim, a.bytes, a.offset + bytesPerDim*dim, b.bytes, b.offset + bytesPerDim*dim);
|
||||
int cmp = StringHelper.compare(bytesPerDim, a.bytes, a.offset + offset, b.bytes, b.offset + offset);
|
||||
|
||||
if (cmp != 0) {
|
||||
return cmp;
|
||||
|
@ -745,15 +747,11 @@ public class BKDWriter implements Closeable {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter:
|
||||
IndexOutput[] lastWriter = new IndexOutput[1];
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles) {
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) {
|
||||
|
||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
||||
@Override
|
||||
protected ByteSequencesWriter getWriter(IndexOutput out) {
|
||||
lastWriter[0] = out;
|
||||
return new ByteSequencesWriter(out) {
|
||||
@Override
|
||||
public void write(byte[] bytes, int off, int len) throws IOException {
|
||||
|
@ -779,11 +777,10 @@ public class BKDWriter implements Closeable {
|
|||
};
|
||||
}
|
||||
};
|
||||
sorter.sort(tempInput.getName());
|
||||
|
||||
assert lastWriter[0] != null;
|
||||
String name = sorter.sort(tempInput.getName());
|
||||
|
||||
return new OfflinePointWriter(tempDir, lastWriter[0], packedBytesLength, pointCount, longOrds, singleValuePerDoc);
|
||||
return new OfflinePointWriter(tempDir, name, packedBytesLength, pointCount, longOrds, singleValuePerDoc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -994,7 +991,7 @@ public class BKDWriter implements Closeable {
|
|||
// and would mean leaving readers (IndexInputs) open for longer:
|
||||
if (writer instanceof OfflinePointWriter) {
|
||||
// We are reading from a temp file; go verify the checksum:
|
||||
String tempFileName = ((OfflinePointWriter) writer).out.getName();
|
||||
String tempFileName = ((OfflinePointWriter) writer).name;
|
||||
try (ChecksumIndexInput in = tempDir.openChecksumInput(tempFileName, IOContext.READONCE)) {
|
||||
CodecUtil.checkFooter(in, priorException);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ final class OfflinePointWriter implements PointWriter {
|
|||
|
||||
final Directory tempDir;
|
||||
final IndexOutput out;
|
||||
final String name;
|
||||
final int packedBytesLength;
|
||||
final boolean singleValuePerDoc;
|
||||
long count;
|
||||
|
@ -45,6 +46,7 @@ final class OfflinePointWriter implements PointWriter {
|
|||
public OfflinePointWriter(Directory tempDir, String tempFileNamePrefix, int packedBytesLength,
|
||||
boolean longOrds, String desc, long expectedCount, boolean singleValuePerDoc) throws IOException {
|
||||
this.out = tempDir.createTempOutput(tempFileNamePrefix, "bkd_" + desc, IOContext.DEFAULT);
|
||||
this.name = out.getName();
|
||||
this.tempDir = tempDir;
|
||||
this.packedBytesLength = packedBytesLength;
|
||||
this.longOrds = longOrds;
|
||||
|
@ -53,8 +55,9 @@ final class OfflinePointWriter implements PointWriter {
|
|||
}
|
||||
|
||||
/** Initializes on an already written/closed file, just so consumers can use {@link #getReader} to read the file. */
|
||||
public OfflinePointWriter(Directory tempDir, IndexOutput out, int packedBytesLength, long count, boolean longOrds, boolean singleValuePerDoc) {
|
||||
this.out = out;
|
||||
public OfflinePointWriter(Directory tempDir, String name, int packedBytesLength, long count, boolean longOrds, boolean singleValuePerDoc) {
|
||||
this.out = null;
|
||||
this.name = name;
|
||||
this.tempDir = tempDir;
|
||||
this.packedBytesLength = packedBytesLength;
|
||||
this.count = count;
|
||||
|
@ -86,7 +89,7 @@ final class OfflinePointWriter implements PointWriter {
|
|||
assert closed;
|
||||
assert start + length <= count: "start=" + start + " length=" + length + " count=" + count;
|
||||
assert expectedCount == 0 || count == expectedCount;
|
||||
return new OfflinePointReader(tempDir, out.getName(), packedBytesLength, start, length, longOrds, singleValuePerDoc);
|
||||
return new OfflinePointReader(tempDir, name, packedBytesLength, start, length, longOrds, singleValuePerDoc);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +97,7 @@ final class OfflinePointWriter implements PointWriter {
|
|||
if (sharedReader == null) {
|
||||
assert start == 0;
|
||||
assert length <= count;
|
||||
sharedReader = new OfflinePointReader(tempDir, out.getName(), packedBytesLength, 0, count, longOrds, singleValuePerDoc);
|
||||
sharedReader = new OfflinePointReader(tempDir, name, packedBytesLength, 0, count, longOrds, singleValuePerDoc);
|
||||
toCloseHeroically.add(sharedReader);
|
||||
// Make sure the OfflinePointReader intends to verify its checksum:
|
||||
assert sharedReader.in instanceof ChecksumIndexInput;
|
||||
|
@ -126,11 +129,11 @@ final class OfflinePointWriter implements PointWriter {
|
|||
sharedReader.close();
|
||||
sharedReader = null;
|
||||
}
|
||||
tempDir.deleteFile(out.getName());
|
||||
tempDir.deleteFile(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OfflinePointWriter(count=" + count + " tempFileName=" + out.getName() + ")";
|
||||
return "OfflinePointWriter(count=" + count + " tempFileName=" + name + ")";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
public class TestFixedLengthBytesRefArray extends LuceneTestCase {
|
||||
|
||||
public void testBasic() throws Exception {
|
||||
FixedLengthBytesRefArray a = new FixedLengthBytesRefArray(Integer.BYTES);
|
||||
int numValues = 100;
|
||||
for(int i=0;i<numValues;i++) {
|
||||
byte[] bytes = {0, 0, 0, (byte) (10-i)};
|
||||
a.append(new BytesRef(bytes));
|
||||
}
|
||||
|
||||
BytesRefIterator iterator = a.iterator(new Comparator<BytesRef>() {
|
||||
@Override
|
||||
public int compare(BytesRef a, BytesRef b) {
|
||||
return a.compareTo(b);
|
||||
}
|
||||
});
|
||||
|
||||
BytesRef last = null;
|
||||
|
||||
int count = 0;
|
||||
while (true) {
|
||||
BytesRef bytes = iterator.next();
|
||||
if (bytes == null) {
|
||||
break;
|
||||
}
|
||||
if (last != null) {
|
||||
assertTrue("count=" + count + " last=" + last + " bytes=" + bytes, last.compareTo(bytes) < 0);
|
||||
}
|
||||
last = BytesRef.deepCopyOf(bytes);
|
||||
count++;
|
||||
}
|
||||
|
||||
assertEquals(numValues, count);
|
||||
}
|
||||
|
||||
public void testRandom() throws Exception {
|
||||
int length = TestUtil.nextInt(random(), 4, 10);
|
||||
int count = atLeast(10000);
|
||||
BytesRef[] values = new BytesRef[count];
|
||||
|
||||
FixedLengthBytesRefArray a = new FixedLengthBytesRefArray(length);
|
||||
for(int i=0;i<count;i++) {
|
||||
BytesRef value = new BytesRef(new byte[length]);
|
||||
random().nextBytes(value.bytes);
|
||||
values[i] = value;
|
||||
a.append(value);
|
||||
}
|
||||
|
||||
Arrays.sort(values);
|
||||
BytesRefIterator iterator = a.iterator(new Comparator<BytesRef>() {
|
||||
@Override
|
||||
public int compare(BytesRef a, BytesRef b) {
|
||||
return a.compareTo(b);
|
||||
}
|
||||
});
|
||||
for(int i=0;i<count;i++) {
|
||||
BytesRef next = iterator.next();
|
||||
assertNotNull(next);
|
||||
assertEquals(values[i], next);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -76,8 +76,8 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
public void testIntermediateMerges() throws Exception {
|
||||
// Sort 20 mb worth of data with 1mb buffer, binary merging.
|
||||
try (Directory dir = newDirectory()) {
|
||||
SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2),
|
||||
generateRandom((int)OfflineSorter.MB * 20));
|
||||
SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1),
|
||||
generateRandom((int)OfflineSorter.MB * 20));
|
||||
assertTrue(info.mergeRounds > 10);
|
||||
}
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
public void testSmallRandom() throws Exception {
|
||||
// Sort 20 mb worth of data with 1mb buffer.
|
||||
try (Directory dir = newDirectory()) {
|
||||
SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES),
|
||||
SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1),
|
||||
generateRandom((int)OfflineSorter.MB * 20));
|
||||
assertEquals(3, sortInfo.mergeRounds);
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
public void testLargerRandom() throws Exception {
|
||||
// Sort 100MB worth of data with 15mb buffer.
|
||||
try (Directory dir = newFSDirectory(createTempDir())) {
|
||||
checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES),
|
||||
checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1),
|
||||
generateRandom((int)OfflineSorter.MB * 100));
|
||||
}
|
||||
}
|
||||
|
@ -367,7 +367,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||
|
||||
CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10).sort(unsorted.getName());
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
|
||||
});
|
||||
assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
|
||||
}
|
||||
|
@ -420,11 +420,67 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||
|
||||
EOFException e = expectThrows(EOFException.class, () -> {
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10).sort(unsorted.getName());
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
|
||||
});
|
||||
assertEquals(1, e.getSuppressed().length);
|
||||
assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException);
|
||||
assertTrue(e.getSuppressed()[0].getMessage().contains("checksum failed (hardware problem?)"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testFixedLengthHeap() throws Exception {
|
||||
// Make sure the RAM accounting is correct, i.e. if we are sorting fixed width
|
||||
// ints (4 bytes) then the heap used is really only 4 bytes per value:
|
||||
Directory dir = newDirectory();
|
||||
IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||
try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) {
|
||||
byte[] bytes = new byte[Integer.BYTES];
|
||||
for (int i=0;i<1024*1024;i++) {
|
||||
random().nextBytes(bytes);
|
||||
w.write(bytes);
|
||||
}
|
||||
CodecUtil.writeFooter(out);
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES);
|
||||
sorter.sort(out.getName());
|
||||
// 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition:
|
||||
assertEquals(0, sorter.sortInfo.mergeRounds);
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testFixedLengthLiesLiesLies() throws Exception {
|
||||
// Make sure OfflineSorter catches me if I lie about the fixed value length:
|
||||
Directory dir = newDirectory();
|
||||
IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
|
||||
try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) {
|
||||
byte[] bytes = new byte[Integer.BYTES];
|
||||
random().nextBytes(bytes);
|
||||
w.write(bytes);
|
||||
CodecUtil.writeFooter(out);
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
|
||||
sorter.sort(out.getName());
|
||||
});
|
||||
assertEquals("value length is 4 but is supposed to always be 8", e.getMessage());
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testInvalidFixedLength() throws Exception {
|
||||
IllegalArgumentException e;
|
||||
e = expectThrows(IllegalArgumentException.class,
|
||||
() -> {
|
||||
new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0);
|
||||
});
|
||||
assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage());
|
||||
e = expectThrows(IllegalArgumentException.class,
|
||||
() -> {
|
||||
new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE);
|
||||
});
|
||||
assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue