LUCENE-7141: switch OfflineSorter's ByteSequencesReader to BytesRefIterator

This commit is contained in:
Mike McCandless 2016-03-26 03:47:06 -04:00
parent ec60f5c43c
commit c46d768664
9 changed files with 72 additions and 68 deletions

View File

@ -13,6 +13,11 @@ New Features
* LUCENE-7140: Add PlanetModel.bisection to spatial3d (Karl Wright via * LUCENE-7140: Add PlanetModel.bisection to spatial3d (Karl Wright via
Mike McCandless) Mike McCandless)
API Changes
* LUCENE-7141: Switch OfflineSorter's ByteSequencesReader to
BytesRefIterator (Mike McCandless)
Optimizations Optimizations
* LUCENE-7071: Reduce bytes copying in OfflineSorter, giving ~10% * LUCENE-7071: Reduce bytes copying in OfflineSorter, giving ~10%

View File

@ -888,7 +888,6 @@ public class Dictionary {
boolean success2 = false; boolean success2 = false;
try (ByteSequencesReader reader = new ByteSequencesReader(tempDir.openChecksumInput(sorted, IOContext.READONCE), sorted)) { try (ByteSequencesReader reader = new ByteSequencesReader(tempDir.openChecksumInput(sorted, IOContext.READONCE), sorted)) {
BytesRefBuilder scratchLine = new BytesRefBuilder();
// TODO: the flags themselves can be double-chars (long) or also numeric // TODO: the flags themselves can be double-chars (long) or also numeric
// either way the trick is to encode them as char... but they must be parsed differently // either way the trick is to encode them as char... but they must be parsed differently
@ -896,9 +895,13 @@ public class Dictionary {
String currentEntry = null; String currentEntry = null;
IntsRefBuilder currentOrds = new IntsRefBuilder(); IntsRefBuilder currentOrds = new IntsRefBuilder();
String line; while (true) {
while (reader.read(scratchLine)) { BytesRef scratch = reader.next();
line = scratchLine.get().utf8ToString(); if (scratch == null) {
break;
}
String line = scratch.utf8ToString();
String entry; String entry;
char wordForm[]; char wordForm[];
int end; int end;

View File

@ -346,7 +346,7 @@ public class OfflineSorter {
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) { PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
@Override @Override
protected boolean lessThan(FileAndTop a, FileAndTop b) { protected boolean lessThan(FileAndTop a, FileAndTop b) {
return comparator.compare(a.current.get(), b.current.get()) < 0; return comparator.compare(a.current, b.current) < 0;
} }
}; };
@ -361,15 +361,14 @@ public class OfflineSorter {
// Open streams and read the top for each file // Open streams and read the top for each file
for (int i = 0; i < segmentsToMerge.size(); i++) { for (int i = 0; i < segmentsToMerge.size(); i++) {
streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i)); streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i));
BytesRefBuilder bytes = new BytesRefBuilder(); BytesRef item = null;
boolean result = false;
try { try {
result = streams[i].read(bytes); item = streams[i].next();
} catch (Throwable t) { } catch (Throwable t) {
verifyChecksum(t, streams[i]); verifyChecksum(t, streams[i]);
} }
assert result; assert item != null;
queue.insertWithOverflow(new FileAndTop(i, bytes)); queue.insertWithOverflow(new FileAndTop(i, item));
} }
// Unix utility sort() uses ordered array of files to pick the next line from, updating // Unix utility sort() uses ordered array of files to pick the next line from, updating
@ -378,15 +377,14 @@ public class OfflineSorter {
// so it shouldn't make much of a difference (didn't check). // so it shouldn't make much of a difference (didn't check).
FileAndTop top; FileAndTop top;
while ((top = queue.top()) != null) { while ((top = queue.top()) != null) {
writer.write(top.current.bytes(), 0, top.current.length()); writer.write(top.current);
boolean result = false;
try { try {
result = streams[top.fd].read(top.current); top.current = streams[top.fd].next();
} catch (Throwable t) { } catch (Throwable t) {
verifyChecksum(t, streams[top.fd]); verifyChecksum(t, streams[top.fd]);
} }
if (result) { if (top.current != null) {
queue.updateTop(); queue.updateTop();
} else { } else {
queue.pop(); queue.pop();
@ -416,18 +414,17 @@ public class OfflineSorter {
/** Read in a single partition of data */ /** Read in a single partition of data */
int readPartition(ByteSequencesReader reader) throws IOException { int readPartition(ByteSequencesReader reader) throws IOException {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
final BytesRefBuilder scratch = new BytesRefBuilder();
while (true) { while (true) {
boolean result = false; BytesRef item = null;
try { try {
result = reader.read(scratch); item = reader.next();
} catch (Throwable t) { } catch (Throwable t) {
verifyChecksum(t, reader); verifyChecksum(t, reader);
} }
if (result == false) { if (item == null) {
break; break;
} }
buffer.append(scratch.get()); buffer.append(item);
// Account for the created objects. // Account for the created objects.
// (buffer slots do not account to buffer size.) // (buffer slots do not account to buffer size.)
if (bufferBytesUsed.get() > ramBufferSize.bytes) { if (bufferBytesUsed.get() > ramBufferSize.bytes) {
@ -440,9 +437,9 @@ public class OfflineSorter {
static class FileAndTop { static class FileAndTop {
final int fd; final int fd;
final BytesRefBuilder current; BytesRef current;
FileAndTop(int fd, BytesRefBuilder firstLine) { FileAndTop(int fd, BytesRef firstLine) {
this.fd = fd; this.fd = fd;
this.current = firstLine; this.current = firstLine;
} }
@ -518,10 +515,11 @@ public class OfflineSorter {
* Utility class to read length-prefixed byte[] entries from an input. * Utility class to read length-prefixed byte[] entries from an input.
* Complementary to {@link ByteSequencesWriter}. * Complementary to {@link ByteSequencesWriter}.
*/ */
public static class ByteSequencesReader implements Closeable { public static class ByteSequencesReader implements BytesRefIterator, Closeable {
protected final String name; protected final String name;
protected final ChecksumIndexInput in; protected final ChecksumIndexInput in;
protected final long end; protected final long end;
private final BytesRefBuilder ref = new BytesRefBuilder();
/** Constructs a ByteSequencesReader from the provided IndexInput */ /** Constructs a ByteSequencesReader from the provided IndexInput */
public ByteSequencesReader(ChecksumIndexInput in, String name) { public ByteSequencesReader(ChecksumIndexInput in, String name) {
@ -538,16 +536,16 @@ public class OfflineSorter {
* the header of the next sequence. Returns <code>true</code> otherwise. * the header of the next sequence. Returns <code>true</code> otherwise.
* @throws EOFException if the file ends before the full sequence is read. * @throws EOFException if the file ends before the full sequence is read.
*/ */
public boolean read(BytesRefBuilder ref) throws IOException { public BytesRef next() throws IOException {
if (in.getFilePointer() >= end) { if (in.getFilePointer() >= end) {
return false; return null;
} }
short length = in.readShort(); short length = in.readShort();
ref.grow(length); ref.grow(length);
ref.setLength(length); ref.setLength(length);
in.readBytes(ref.bytes(), 0, length); in.readBytes(ref.bytes(), 0, length);
return true; return ref.get();
} }
/** /**

View File

@ -33,7 +33,6 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.IntroSorter; import org.apache.lucene.util.IntroSorter;
@ -736,6 +735,8 @@ public class BKDWriter implements Closeable {
// TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter: // TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter:
IndexOutput[] lastWriter = new IndexOutput[1]; IndexOutput[] lastWriter = new IndexOutput[1];
final BytesRef scratch = new BytesRef(new byte[bytesPerDoc]);
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) { OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) {
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
@ -756,14 +757,12 @@ public class BKDWriter implements Closeable {
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException { protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
return new ByteSequencesReader(in, name) { return new ByteSequencesReader(in, name) {
@Override @Override
public boolean read(BytesRefBuilder ref) throws IOException { public BytesRef next() throws IOException {
if (in.getFilePointer() >= end) { if (in.getFilePointer() >= end) {
return false; return null;
} }
ref.grow(bytesPerDoc); in.readBytes(scratch.bytes, 0, bytesPerDoc);
in.readBytes(ref.bytes(), 0, bytesPerDoc); return scratch;
ref.setLength(bytesPerDoc);
return true;
} }
}; };
} }

View File

@ -29,7 +29,6 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.OfflineSorter.ByteSequencesReader; import org.apache.lucene.util.OfflineSorter.ByteSequencesReader;
import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter; import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter;
@ -53,7 +52,6 @@ public class SortedInputIterator implements InputIterator {
private boolean done = false; private boolean done = false;
private long weight; private long weight;
private final BytesRefBuilder scratch = new BytesRefBuilder();
private BytesRef payload = new BytesRef(); private BytesRef payload = new BytesRef();
private Set<BytesRef> contexts = null; private Set<BytesRef> contexts = null;
@ -86,8 +84,8 @@ public class SortedInputIterator implements InputIterator {
} }
try { try {
ByteArrayDataInput input = new ByteArrayDataInput(); ByteArrayDataInput input = new ByteArrayDataInput();
if (reader.read(scratch)) { BytesRef bytes = reader.next();
final BytesRef bytes = scratch.get(); if (bytes != null) {
weight = decode(bytes, input); weight = decode(bytes, input);
if (hasPayloads) { if (hasPayloads) {
payload = decodePayload(bytes, input); payload = decodePayload(bytes, input);

View File

@ -509,8 +509,12 @@ public class AnalyzingSuggester extends Lookup implements Accountable {
Set<BytesRef> seenSurfaceForms = new HashSet<>(); Set<BytesRef> seenSurfaceForms = new HashSet<>();
int dedup = 0; int dedup = 0;
while (reader.read(scratch)) { while (true) {
input.reset(scratch.bytes(), 0, scratch.length()); BytesRef bytes = reader.next();
if (bytes == null) {
break;
}
input.reset(bytes.bytes, bytes.offset, bytes.length);
short analyzedLength = input.readShort(); short analyzedLength = input.readShort();
analyzed.grow(analyzedLength+2); analyzed.grow(analyzedLength+2);
input.readBytes(analyzed.bytes(), 0, analyzedLength); input.readBytes(analyzed.bytes(), 0, analyzedLength);
@ -518,13 +522,13 @@ public class AnalyzingSuggester extends Lookup implements Accountable {
long cost = input.readInt(); long cost = input.readInt();
surface.bytes = scratch.bytes(); surface.bytes = bytes.bytes;
if (hasPayloads) { if (hasPayloads) {
surface.length = input.readShort(); surface.length = input.readShort();
surface.offset = input.getPosition(); surface.offset = input.getPosition();
} else { } else {
surface.offset = input.getPosition(); surface.offset = input.getPosition();
surface.length = scratch.length() - surface.offset; surface.length = bytes.length - surface.offset;
} }
if (previousAnalyzed == null) { if (previousAnalyzed == null) {
@ -566,11 +570,11 @@ public class AnalyzingSuggester extends Lookup implements Accountable {
builder.add(scratchInts.get(), outputs.newPair(cost, BytesRef.deepCopyOf(surface))); builder.add(scratchInts.get(), outputs.newPair(cost, BytesRef.deepCopyOf(surface)));
} else { } else {
int payloadOffset = input.getPosition() + surface.length; int payloadOffset = input.getPosition() + surface.length;
int payloadLength = scratch.length() - payloadOffset; int payloadLength = bytes.length - payloadOffset;
BytesRef br = new BytesRef(surface.length + 1 + payloadLength); BytesRef br = new BytesRef(surface.length + 1 + payloadLength);
System.arraycopy(surface.bytes, surface.offset, br.bytes, 0, surface.length); System.arraycopy(surface.bytes, surface.offset, br.bytes, 0, surface.length);
br.bytes[surface.length] = PAYLOAD_SEP; br.bytes[surface.length] = PAYLOAD_SEP;
System.arraycopy(scratch.bytes(), payloadOffset, br.bytes, surface.length+1, payloadLength); System.arraycopy(bytes.bytes, payloadOffset, br.bytes, surface.length+1, payloadLength);
br.length = br.bytes.length; br.length = br.bytes.length;
builder.add(scratchInts.get(), outputs.newPair(cost, br)); builder.add(scratchInts.get(), outputs.newPair(cost, br));
} }

View File

@ -24,7 +24,6 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.OfflineSorter; import org.apache.lucene.util.OfflineSorter;
@ -105,9 +104,10 @@ public class ExternalRefSorter implements BytesRefSorter, Closeable {
/** /**
* Iterate over byte refs in a file. * Iterate over byte refs in a file.
*/ */
// TODO: this class is a bit silly ... sole purpose is to "remove" Closeable from what #iterator returns:
class ByteSequenceIterator implements BytesRefIterator { class ByteSequenceIterator implements BytesRefIterator {
private final OfflineSorter.ByteSequencesReader reader; private final OfflineSorter.ByteSequencesReader reader;
private BytesRefBuilder scratch = new BytesRefBuilder(); private BytesRef scratch;
public ByteSequenceIterator(OfflineSorter.ByteSequencesReader reader) { public ByteSequenceIterator(OfflineSorter.ByteSequencesReader reader) {
this.reader = reader; this.reader = reader;
@ -115,20 +115,14 @@ public class ExternalRefSorter implements BytesRefSorter, Closeable {
@Override @Override
public BytesRef next() throws IOException { public BytesRef next() throws IOException {
if (scratch == null) {
return null;
}
boolean success = false; boolean success = false;
try { try {
if (reader.read(scratch) == false) { scratch = reader.next();
IOUtils.close(reader); if (scratch == null) {
scratch = null; reader.close();
} }
success = true; success = true;
if (scratch == null) { return scratch;
return null;
}
return scratch.get();
} finally { } finally {
if (!success) { if (!success) {
IOUtils.closeWhileHandlingException(reader); IOUtils.closeWhileHandlingException(reader);

View File

@ -39,7 +39,6 @@ import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables; import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.CharsRefBuilder; import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.OfflineSorter; import org.apache.lucene.util.OfflineSorter;
@ -215,10 +214,13 @@ public class FSTCompletionLookup extends Lookup implements Accountable {
int previousBucket = 0; int previousBucket = 0;
int previousScore = 0; int previousScore = 0;
ByteArrayDataInput input = new ByteArrayDataInput(); ByteArrayDataInput input = new ByteArrayDataInput();
BytesRefBuilder tmp1 = new BytesRefBuilder();
BytesRef tmp2 = new BytesRef(); BytesRef tmp2 = new BytesRef();
while (reader.read(tmp1)) { while (true) {
input.reset(tmp1.bytes()); BytesRef scratch = reader.next();
if (scratch == null) {
break;
}
input.reset(scratch.bytes, scratch.offset, scratch.length);
int currentScore = input.readInt(); int currentScore = input.readInt();
int bucket; int bucket;
@ -231,9 +233,9 @@ public class FSTCompletionLookup extends Lookup implements Accountable {
previousBucket = bucket; previousBucket = bucket;
// Only append the input, discard the weight. // Only append the input, discard the weight.
tmp2.bytes = tmp1.bytes(); tmp2.bytes = scratch.bytes;
tmp2.offset = input.getPosition(); tmp2.offset = scratch.offset + input.getPosition();
tmp2.length = tmp1.length() - input.getPosition(); tmp2.length = scratch.length - input.getPosition();
builder.add(tmp2, bucket); builder.add(tmp2, bucket);
line++; line++;
@ -293,7 +295,7 @@ public class FSTCompletionLookup extends Lookup implements Accountable {
@Override @Override
public synchronized boolean store(DataOutput output) throws IOException { public synchronized boolean store(DataOutput output) throws IOException {
output.writeVLong(count); output.writeVLong(count);
if (this.normalCompletion == null || normalCompletion.getFST() == null) { if (normalCompletion == null || normalCompletion.getFST() == null) {
return false; return false;
} }
normalCompletion.getFST().save(output); normalCompletion.getFST().save(output);

View File

@ -57,12 +57,13 @@ public class BytesRefSortersTest extends LuceneTestCase {
sorter.add(new BytesRef(new byte [1])); sorter.add(new BytesRef(new byte [1]));
}); });
BytesRef spare1; while (true) {
BytesRef spare2; BytesRef spare1 = i1.next();
while ((spare1 = i1.next()) != null && (spare2 = i2.next()) != null) { BytesRef spare2 = i2.next();
assertEquals(spare1, spare2); assertEquals(spare1, spare2);
if (spare1 == null) {
break;
}
} }
assertNull(i1.next());
assertNull(i2.next());
} }
} }