LUCENE-9228: Sort dvUpdates by terms before apply

With this change, we sort dvUpdates in the term order before applying if
they all update a single field to the same value. This optimization can
reduce the flush time by around 20% for the docValues update user cases.
This commit is contained in:
Nhat Nguyen 2020-02-20 11:47:35 -05:00
parent d5e51bf994
commit 83ccb8d2a2
5 changed files with 204 additions and 39 deletions

View File

@ -72,6 +72,10 @@ Improvements
* LUCENE-9194: Simplify XYShapeXQuery API by adding a new abstract class called XYGeometry. Queries are
executed with input objects that extend such interface. (Ignacio Vera)
* LUCENE-9228: Sort dvUpdates in the term order before applying if they all update a
single field to the same value. This optimization can reduce the flush time by around
20% for the docValues update user cases. (Nhat Nguyen, Adrien Grand, Simon Willnauer)
Optimizations
---------------------

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
@ -51,6 +52,7 @@ final class FieldUpdatesBuffer {
// on CPU for those. We also save on not needing to sort in order to apply the terms in order
// since by definition we store them in order.
private final BytesRefArray termValues;
private BytesRefArray.SortState termSortState;
private final BytesRefArray byteValues; // this will be null if we are buffering numerics
private int[] docsUpTo;
private long[] numericValues; // this will be null if we are buffering binaries
@ -59,6 +61,7 @@ final class FieldUpdatesBuffer {
private long minNumeric = Long.MAX_VALUE;
private String[] fields;
private final boolean isNumeric;
private boolean finished = false;
private FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) {
this.bytesUsed = bytesUsed;
@ -115,6 +118,7 @@ final class FieldUpdatesBuffer {
}
void add(String field, int docUpTo, int ord, boolean hasValue) {
assert finished == false : "buffer was finished already";
if (fields[0].equals(field) == false || fields.length != 1 ) {
if (fields.length <= ord) {
String[] array = ArrayUtil.grow(fields, ord+1);
@ -195,7 +199,26 @@ final class FieldUpdatesBuffer {
return numUpdates++;
}
void finish() {
if (finished) {
throw new IllegalStateException("buffer was finished already");
}
finished = true;
final boolean sortedTerms = hasSingleValue() && hasValues == null && fields.length == 1;
if (sortedTerms) {
// sort by ascending by term, then sort descending by docsUpTo so that we can skip updates with lower docUpTo.
termSortState = termValues.sort(Comparator.naturalOrder(),
(i1, i2) -> Integer.compare(
docsUpTo[getArrayIndex(docsUpTo.length, i2)],
docsUpTo[getArrayIndex(docsUpTo.length, i1)]));
bytesUsed.addAndGet(termSortState.ramBytesUsed());
}
}
BufferedUpdateIterator iterator() {
if (finished == false) {
throw new IllegalStateException("buffer is not finished yet");
}
return new BufferedUpdateIterator();
}
@ -264,26 +287,36 @@ final class FieldUpdatesBuffer {
* An iterator that iterates over all updates in insertion order
*/
class BufferedUpdateIterator {
private final BytesRefIterator termValuesIterator;
private final BytesRefArray.IndexedBytesRefIterator termValuesIterator;
private final BytesRefArray.IndexedBytesRefIterator lookAheadTermIterator;
private final BytesRefIterator byteValuesIterator;
private final BufferedUpdate bufferedUpdate = new BufferedUpdate();
private final Bits updatesWithValue;
private int index = 0;
BufferedUpdateIterator() {
this.termValuesIterator = termValues.iterator();
this.termValuesIterator = termValues.iterator(termSortState);
this.lookAheadTermIterator = termSortState != null ? termValues.iterator(termSortState) : null;
this.byteValuesIterator = isNumeric ? null : byteValues.iterator();
updatesWithValue = hasValues == null ? new Bits.MatchAllBits(numUpdates) : hasValues;
}
/**
* If all updates update a single field to the same value, then we can apply these
* updates in the term order instead of the request order as both will yield the same result.
* This optimization allows us to iterate the term dictionary faster and de-duplicate updates.
*/
boolean isSortedTerms() {
return termSortState != null;
}
/**
* Moves to the next BufferedUpdate or return null if all updates are consumed.
* The returned instance is a shared instance and must be fully consumed before the next call to this method.
*/
BufferedUpdate next() throws IOException {
BytesRef next = termValuesIterator.next();
BytesRef next = nextTerm();
if (next != null) {
final int idx = index++;
final int idx = termValuesIterator.ord();
bufferedUpdate.termValue = next;
bufferedUpdate.hasValue = updatesWithValue.get(idx);
bufferedUpdate.termField = fields[getArrayIndex(fields.length, idx)];
@ -304,6 +337,20 @@ final class FieldUpdatesBuffer {
return null;
}
}
BytesRef nextTerm() throws IOException {
if (lookAheadTermIterator != null) {
final BytesRef lastTerm = bufferedUpdate.termValue;
BytesRef lookAheadTerm;
while ((lookAheadTerm = lookAheadTermIterator.next()) != null && lookAheadTerm.equals(lastTerm)) {
BytesRef discardedTerm = termValuesIterator.next(); // discard as the docUpTo of the previous update is higher
assert discardedTerm.equals(lookAheadTerm) : "[" + discardedTerm + "] != [" + lookAheadTerm + "]";
assert docsUpTo[getArrayIndex(docsUpTo.length, termValuesIterator.ord())] <= bufferedUpdate.docUpTo :
docsUpTo[getArrayIndex(docsUpTo.length, termValuesIterator.ord())] + ">" + bufferedUpdate.docUpTo;
}
}
return termValuesIterator.next();
}
}
private static int getArrayIndex(int arrayLength, int index) {

View File

@ -110,6 +110,7 @@ final class FrozenBufferedUpdates {
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish);
this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates));
this.fieldUpdatesCount = updates.numFieldUpdates.get();
@ -491,7 +492,7 @@ final class FrozenBufferedUpdates {
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, iterator.isSortedTerms());
while ((bufferedUpdate = iterator.next()) != null) {
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
@ -521,7 +522,6 @@ final class FrozenBufferedUpdates {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
termDocsIterator.getDocs();
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
@ -825,7 +825,7 @@ final class FrozenBufferedUpdates {
return null; // requested term does not exist in this segment
} else if (cmp == 0) {
return getDocs();
} else if (cmp > 0) {
} else {
TermsEnum.SeekStatus status = termsEnum.seekCeil(term);
switch (status) {
case FOUND:

View File

@ -18,6 +18,7 @@ package org.apache.lucene.util;
import java.util.Arrays;
import java.util.Comparator;
import java.util.function.IntBinaryOperator;
/**
* A simple append only random-access {@link BytesRef} array that stores full
@ -116,8 +117,12 @@ public final class BytesRefArray implements SortableBytesRefArray {
}
pool.setBytesRef(spare, result, offset, length);
}
private int[] sort(final Comparator<BytesRef> comp) {
/**
* Returns a {@link SortState} representing the order of elements in this array. This is a non-destructive operation.
*/
public SortState sort(final Comparator<BytesRef> comp, final IntBinaryOperator tieComparator) {
final int[] orderedEntries = new int[size()];
for (int i = 0; i < orderedEntries.length; i++) {
orderedEntries[i] = i;
@ -135,22 +140,28 @@ public final class BytesRefArray implements SortableBytesRefArray {
final int idx1 = orderedEntries[i], idx2 = orderedEntries[j];
setBytesRef(scratch1, scratchBytes1, idx1);
setBytesRef(scratch2, scratchBytes2, idx2);
return comp.compare(scratchBytes1, scratchBytes2);
return compare(idx1, scratchBytes1, idx2, scratchBytes2);
}
@Override
protected void setPivot(int i) {
final int index = orderedEntries[i];
setBytesRef(pivotBuilder, pivot, index);
pivotIndex = orderedEntries[i];
setBytesRef(pivotBuilder, pivot, pivotIndex);
}
@Override
protected int comparePivot(int j) {
final int index = orderedEntries[j];
setBytesRef(scratch2, scratchBytes2, index);
return comp.compare(pivot, scratchBytes2);
return compare(pivotIndex, pivot, index, scratchBytes2);
}
private int compare(int i1, BytesRef b1, int i2, BytesRef b2) {
int res = comp.compare(b1, b2);
return res == 0 ? tieComparator.applyAsInt(i1, i2) : res;
}
private int pivotIndex;
private final BytesRef pivot = new BytesRef();
private final BytesRef scratchBytes1 = new BytesRef();
private final BytesRef scratchBytes2 = new BytesRef();
@ -158,14 +169,14 @@ public final class BytesRefArray implements SortableBytesRefArray {
private final BytesRefBuilder scratch1 = new BytesRefBuilder();
private final BytesRefBuilder scratch2 = new BytesRefBuilder();
}.sort(0, size());
return orderedEntries;
return new SortState(orderedEntries);
}
/**
* sugar for {@link #iterator(Comparator)} with a <code>null</code> comparator
*/
public BytesRefIterator iterator() {
return iterator(null);
return iterator((SortState) null);
}
/**
@ -184,20 +195,66 @@ public final class BytesRefArray implements SortableBytesRefArray {
*/
@Override
public BytesRefIterator iterator(final Comparator<BytesRef> comp) {
return iterator(sort(comp, (i, j) -> 0));
}
/**
* Returns an {@link IndexedBytesRefIterator} with point in time semantics. The iterator provides access to all
* so far appended {@link BytesRef} instances. If a non-null sortState is specified then the iterator will iterate
* the byte values in the order of the sortState; otherwise, the order is the same as the values were appended.
*/
public IndexedBytesRefIterator iterator(final SortState sortState) {
final int size = size();
final int[] indices = sortState == null ? null : sortState.indices;
assert indices == null || indices.length == size : indices.length + " != " + size;
final BytesRefBuilder spare = new BytesRefBuilder();
final BytesRef result = new BytesRef();
final int size = size();
final int[] indices = comp == null ? null : sort(comp);
return new BytesRefIterator() {
int pos = 0;
return new IndexedBytesRefIterator() {
int pos = -1;
int ord = 0;
@Override
public BytesRef next() {
++pos;
if (pos < size) {
setBytesRef(spare, result, indices == null ? pos++ : indices[pos++]);
ord = indices == null ? pos : indices[pos];
setBytesRef(spare, result, ord);
return result;
}
return null;
}
@Override
public int ord() {
return ord;
}
};
}
/**
* Used to iterate the elements of an array in a given order.
*/
public final static class SortState implements Accountable {
private final int[] indices;
private SortState(int[] indices) {
this.indices = indices;
}
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + indices.length * Integer.BYTES;
}
}
/**
* An extension of {@link BytesRefIterator} that allows retrieving the index of the current element
*/
public interface IndexedBytesRefIterator extends BytesRefIterator {
/**
* Returns the ordinal position of the element that was returned in the latest call of {@link #next()}.
* Do not call this method if {@link #next()} is not called yet or the last call returned a null value.
*/
int ord();
}
}

View File

@ -20,7 +20,10 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.util.BytesRef;
@ -46,6 +49,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
assertTrue(buffer.isNumeric());
assertEquals(13, buffer.getMaxNumeric());
assertEquals(6, buffer.getMinNumeric());
buffer.finish();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value = iterator.next();
assertNotNull(value);
@ -99,6 +103,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE);
}
buffer.addUpdate(new Term("id", "4"), intValue, Integer.MAX_VALUE);
buffer.finish();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
@ -131,6 +136,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE);
}
buffer.addUpdate(new Term("id", "4"), new BytesRef(""), Integer.MAX_VALUE);
buffer.finish();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
@ -149,12 +155,20 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
assertFalse(buffer.isNumeric());
}
int randomDocUpTo() {
if (random().nextInt(5) == 0) {
return Integer.MAX_VALUE;
} else {
return random().nextInt(10000);
}
}
DocValuesUpdate.BinaryDocValuesUpdate getRandomBinaryUpdate() {
String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field"));
String docId = "" + random().nextInt(10);
DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary",
DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary",
rarely() ? null : new BytesRef(TestUtil.randomRealisticUnicodeString(random())));
return rarely() ? value.prepareForApply(random().nextInt(100)) : value;
return rarely() ? value.prepareForApply(randomDocUpTo()) : value;
}
DocValuesUpdate.NumericDocValuesUpdate getRandomNumericUpdate() {
@ -162,7 +176,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
String docId = "" + random().nextInt(10);
DocValuesUpdate.NumericDocValuesUpdate value = new DocValuesUpdate.NumericDocValuesUpdate(new Term(termField, docId), "numeric",
rarely() ? null : Long.valueOf(random().nextInt(100)));
return rarely() ? value.prepareForApply(random().nextInt(100)) : value;
return rarely() ? value.prepareForApply(randomDocUpTo()) : value;
}
public void testBinaryRandom() throws IOException {
@ -181,6 +195,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto);
}
}
buffer.finish();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
@ -216,6 +231,55 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto);
}
}
buffer.finish();
DocValuesUpdate.NumericDocValuesUpdate lastUpdate = randomUpdate;
boolean termsSorted = lastUpdate.hasValue && updates.stream()
.allMatch(update -> update.field.equals(lastUpdate.field) &&
update.hasValue && update.getValue() == lastUpdate.getValue());
assertBufferUpdates(buffer, updates, termsSorted);
}
public void testNoNumericValue() {
DocValuesUpdate.NumericDocValuesUpdate update =
new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", null);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(Counter.newCounter(), update, update.docIDUpto);
assertEquals(0, buffer.getMinNumeric());
assertEquals(0, buffer.getMaxNumeric());
}
public void testSortAndDedupNumericUpdatesByTerms() throws IOException {
List<DocValuesUpdate.NumericDocValuesUpdate> updates = new ArrayList<>();
int numUpdates = 1 + random().nextInt(1000);
Counter counter = Counter.newCounter();
String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field"));
long docValue = 1 + random().nextInt(1000);
DocValuesUpdate.NumericDocValuesUpdate randomUpdate = new DocValuesUpdate.NumericDocValuesUpdate(
new Term(termField, Integer.toString(random().nextInt(1000))), "numeric", docValue);
randomUpdate = randomUpdate.prepareForApply(randomDocUpTo());
updates.add(randomUpdate);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto);
for (int i = 0; i < numUpdates; i++) {
randomUpdate = new DocValuesUpdate.NumericDocValuesUpdate(
new Term(termField, Integer.toString(random().nextInt(1000))), "numeric", docValue);
randomUpdate = randomUpdate.prepareForApply(randomDocUpTo());
updates.add(randomUpdate);
buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto);
}
buffer.finish();
assertBufferUpdates(buffer, updates, true);
}
void assertBufferUpdates(FieldUpdatesBuffer buffer,
List<DocValuesUpdate.NumericDocValuesUpdate> updates,
boolean termSorted) throws IOException {
if (termSorted) {
updates.sort(Comparator.comparing(u -> u.term.bytes));
SortedMap<BytesRef, DocValuesUpdate.NumericDocValuesUpdate> byTerms = new TreeMap<>();
for (DocValuesUpdate.NumericDocValuesUpdate update : updates) {
byTerms.compute(update.term.bytes, (k, v) -> v != null && v.docIDUpto >= update.docIDUpto ? v : update);
}
updates = new ArrayList<>(byTerms.values());
}
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
@ -223,14 +287,15 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
boolean hasAtLeastOneValue = false;
DocValuesUpdate.NumericDocValuesUpdate expectedUpdate;
while ((value = iterator.next()) != null) {
long v = buffer.getNumericValue(count);
randomUpdate = updates.get(count++);
assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString());
assertEquals(randomUpdate.term.field, value.termField);
assertEquals(randomUpdate.hasValue, value.hasValue);
if (randomUpdate.hasValue) {
assertEquals(randomUpdate.getValue(), value.numericValue);
expectedUpdate = updates.get(count++);
assertEquals(expectedUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString());
assertEquals(expectedUpdate.term.field, value.termField);
assertEquals(expectedUpdate.hasValue, value.hasValue);
if (expectedUpdate.hasValue) {
assertEquals(expectedUpdate.getValue(), value.numericValue);
assertEquals(v, value.numericValue);
min = Math.min(min, v);
max = Math.max(max, v);
@ -239,7 +304,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
assertEquals(0, value.numericValue);
assertEquals(0, v);
}
assertEquals(randomUpdate.docIDUpto, value.docUpTo);
assertEquals(expectedUpdate.docIDUpto, value.docUpTo);
}
if (hasAtLeastOneValue) {
assertEquals(max, buffer.getMaxNumeric());
@ -250,12 +315,4 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase {
}
assertEquals(count, updates.size());
}
public void testNoNumericValue() {
DocValuesUpdate.NumericDocValuesUpdate update =
new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", null);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(Counter.newCounter(), update, update.docIDUpto);
assertEquals(0, buffer.getMinNumeric());
assertEquals(0, buffer.getMaxNumeric());
}
}