From 83ccb8d2a2204c78de0cf119d63ef452ef95fcc6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Feb 2020 11:47:35 -0500 Subject: [PATCH] LUCENE-9228: Sort dvUpdates by terms before apply With this change, we sort dvUpdates in the term order before applying if they all update a single field to the same value. This optimization can reduce the flush time by around 20% for the docValues update user cases. --- lucene/CHANGES.txt | 4 + .../lucene/index/FieldUpdatesBuffer.java | 57 +++++++++++- .../lucene/index/FrozenBufferedUpdates.java | 6 +- .../org/apache/lucene/util/BytesRefArray.java | 83 ++++++++++++++--- .../lucene/index/TestFieldUpdatesBuffer.java | 93 +++++++++++++++---- 5 files changed, 204 insertions(+), 39 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index b5bcb21e6ed..b499cd96d1a 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -72,6 +72,10 @@ Improvements * LUCENE-9194: Simplify XYShapeXQuery API by adding a new abstract class called XYGeometry. Queries are executed with input objects that extend such interface. (Ignacio Vera) +* LUCENE-9228: Sort dvUpdates in the term order before applying if they all update a + single field to the same value. This optimization can reduce the flush time by around + 20% for the docValues update user cases. (Nhat Nguyen, Adrien Grand, Simon Willnauer) + Optimizations --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java b/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java index c91f6502244..45bb729f18c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java +++ b/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; @@ -51,6 +52,7 @@ final class FieldUpdatesBuffer { // on CPU for those. We also save on not needing to sort in order to apply the terms in order // since by definition we store them in order. private final BytesRefArray termValues; + private BytesRefArray.SortState termSortState; private final BytesRefArray byteValues; // this will be null if we are buffering numerics private int[] docsUpTo; private long[] numericValues; // this will be null if we are buffering binaries @@ -59,6 +61,7 @@ final class FieldUpdatesBuffer { private long minNumeric = Long.MAX_VALUE; private String[] fields; private final boolean isNumeric; + private boolean finished = false; private FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) { this.bytesUsed = bytesUsed; @@ -115,6 +118,7 @@ final class FieldUpdatesBuffer { } void add(String field, int docUpTo, int ord, boolean hasValue) { + assert finished == false : "buffer was finished already"; if (fields[0].equals(field) == false || fields.length != 1 ) { if (fields.length <= ord) { String[] array = ArrayUtil.grow(fields, ord+1); @@ -195,7 +199,26 @@ final class FieldUpdatesBuffer { return numUpdates++; } + void finish() { + if (finished) { + throw new IllegalStateException("buffer was finished already"); + } + finished = true; + final boolean sortedTerms = hasSingleValue() && hasValues == null && fields.length == 1; + if (sortedTerms) { + // sort by ascending by term, then sort descending by docsUpTo so that we can skip updates with lower docUpTo. + termSortState = termValues.sort(Comparator.naturalOrder(), + (i1, i2) -> Integer.compare( + docsUpTo[getArrayIndex(docsUpTo.length, i2)], + docsUpTo[getArrayIndex(docsUpTo.length, i1)])); + bytesUsed.addAndGet(termSortState.ramBytesUsed()); + } + } + BufferedUpdateIterator iterator() { + if (finished == false) { + throw new IllegalStateException("buffer is not finished yet"); + } return new BufferedUpdateIterator(); } @@ -264,26 +287,36 @@ final class FieldUpdatesBuffer { * An iterator that iterates over all updates in insertion order */ class BufferedUpdateIterator { - private final BytesRefIterator termValuesIterator; + private final BytesRefArray.IndexedBytesRefIterator termValuesIterator; + private final BytesRefArray.IndexedBytesRefIterator lookAheadTermIterator; private final BytesRefIterator byteValuesIterator; private final BufferedUpdate bufferedUpdate = new BufferedUpdate(); private final Bits updatesWithValue; - private int index = 0; BufferedUpdateIterator() { - this.termValuesIterator = termValues.iterator(); + this.termValuesIterator = termValues.iterator(termSortState); + this.lookAheadTermIterator = termSortState != null ? termValues.iterator(termSortState) : null; this.byteValuesIterator = isNumeric ? null : byteValues.iterator(); updatesWithValue = hasValues == null ? new Bits.MatchAllBits(numUpdates) : hasValues; } + /** + * If all updates update a single field to the same value, then we can apply these + * updates in the term order instead of the request order as both will yield the same result. + * This optimization allows us to iterate the term dictionary faster and de-duplicate updates. + */ + boolean isSortedTerms() { + return termSortState != null; + } + /** * Moves to the next BufferedUpdate or return null if all updates are consumed. * The returned instance is a shared instance and must be fully consumed before the next call to this method. */ BufferedUpdate next() throws IOException { - BytesRef next = termValuesIterator.next(); + BytesRef next = nextTerm(); if (next != null) { - final int idx = index++; + final int idx = termValuesIterator.ord(); bufferedUpdate.termValue = next; bufferedUpdate.hasValue = updatesWithValue.get(idx); bufferedUpdate.termField = fields[getArrayIndex(fields.length, idx)]; @@ -304,6 +337,20 @@ final class FieldUpdatesBuffer { return null; } } + + BytesRef nextTerm() throws IOException { + if (lookAheadTermIterator != null) { + final BytesRef lastTerm = bufferedUpdate.termValue; + BytesRef lookAheadTerm; + while ((lookAheadTerm = lookAheadTermIterator.next()) != null && lookAheadTerm.equals(lastTerm)) { + BytesRef discardedTerm = termValuesIterator.next(); // discard as the docUpTo of the previous update is higher + assert discardedTerm.equals(lookAheadTerm) : "[" + discardedTerm + "] != [" + lookAheadTerm + "]"; + assert docsUpTo[getArrayIndex(docsUpTo.length, termValuesIterator.ord())] <= bufferedUpdate.docUpTo : + docsUpTo[getArrayIndex(docsUpTo.length, termValuesIterator.ord())] + ">" + bufferedUpdate.docUpTo; + } + } + return termValuesIterator.next(); + } } private static int getArrayIndex(int arrayLength, int index) { diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 14156a7e8a1..9ecb08ddbf5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -110,6 +110,7 @@ final class FrozenBufferedUpdates { // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. + updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish); this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates)); this.fieldUpdatesCount = updates.numFieldUpdates.get(); @@ -491,7 +492,7 @@ final class FrozenBufferedUpdates { boolean isNumeric = value.isNumeric(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator(); FieldUpdatesBuffer.BufferedUpdate bufferedUpdate; - TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false); + TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, iterator.isSortedTerms()); while ((bufferedUpdate = iterator.next()) != null) { // TODO: we traverse the terms in update order (not term order) so that we // apply the updates in the correct order, i.e. if two terms update the @@ -521,7 +522,6 @@ final class FrozenBufferedUpdates { longValue = bufferedUpdate.numericValue; binaryValue = bufferedUpdate.binaryValue; } - termDocsIterator.getDocs(); if (dvUpdates == null) { if (isNumeric) { if (value.hasSingleValue()) { @@ -825,7 +825,7 @@ final class FrozenBufferedUpdates { return null; // requested term does not exist in this segment } else if (cmp == 0) { return getDocs(); - } else if (cmp > 0) { + } else { TermsEnum.SeekStatus status = termsEnum.seekCeil(term); switch (status) { case FOUND: diff --git a/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java b/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java index f169d1a99d6..eb168232a01 100644 --- a/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java +++ b/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java @@ -18,6 +18,7 @@ package org.apache.lucene.util; import java.util.Arrays; import java.util.Comparator; +import java.util.function.IntBinaryOperator; /** * A simple append only random-access {@link BytesRef} array that stores full @@ -116,8 +117,12 @@ public final class BytesRefArray implements SortableBytesRefArray { } pool.setBytesRef(spare, result, offset, length); } - - private int[] sort(final Comparator comp) { + + + /** + * Returns a {@link SortState} representing the order of elements in this array. This is a non-destructive operation. + */ + public SortState sort(final Comparator comp, final IntBinaryOperator tieComparator) { final int[] orderedEntries = new int[size()]; for (int i = 0; i < orderedEntries.length; i++) { orderedEntries[i] = i; @@ -135,22 +140,28 @@ public final class BytesRefArray implements SortableBytesRefArray { final int idx1 = orderedEntries[i], idx2 = orderedEntries[j]; setBytesRef(scratch1, scratchBytes1, idx1); setBytesRef(scratch2, scratchBytes2, idx2); - return comp.compare(scratchBytes1, scratchBytes2); + return compare(idx1, scratchBytes1, idx2, scratchBytes2); } @Override protected void setPivot(int i) { - final int index = orderedEntries[i]; - setBytesRef(pivotBuilder, pivot, index); + pivotIndex = orderedEntries[i]; + setBytesRef(pivotBuilder, pivot, pivotIndex); } @Override protected int comparePivot(int j) { final int index = orderedEntries[j]; setBytesRef(scratch2, scratchBytes2, index); - return comp.compare(pivot, scratchBytes2); + return compare(pivotIndex, pivot, index, scratchBytes2); } + private int compare(int i1, BytesRef b1, int i2, BytesRef b2) { + int res = comp.compare(b1, b2); + return res == 0 ? tieComparator.applyAsInt(i1, i2) : res; + } + + private int pivotIndex; private final BytesRef pivot = new BytesRef(); private final BytesRef scratchBytes1 = new BytesRef(); private final BytesRef scratchBytes2 = new BytesRef(); @@ -158,14 +169,14 @@ public final class BytesRefArray implements SortableBytesRefArray { private final BytesRefBuilder scratch1 = new BytesRefBuilder(); private final BytesRefBuilder scratch2 = new BytesRefBuilder(); }.sort(0, size()); - return orderedEntries; + return new SortState(orderedEntries); } /** * sugar for {@link #iterator(Comparator)} with a null comparator */ public BytesRefIterator iterator() { - return iterator(null); + return iterator((SortState) null); } /** @@ -184,20 +195,66 @@ public final class BytesRefArray implements SortableBytesRefArray { */ @Override public BytesRefIterator iterator(final Comparator comp) { + return iterator(sort(comp, (i, j) -> 0)); + } + + /** + * Returns an {@link IndexedBytesRefIterator} with point in time semantics. The iterator provides access to all + * so far appended {@link BytesRef} instances. If a non-null sortState is specified then the iterator will iterate + * the byte values in the order of the sortState; otherwise, the order is the same as the values were appended. + */ + public IndexedBytesRefIterator iterator(final SortState sortState) { + final int size = size(); + final int[] indices = sortState == null ? null : sortState.indices; + assert indices == null || indices.length == size : indices.length + " != " + size; final BytesRefBuilder spare = new BytesRefBuilder(); final BytesRef result = new BytesRef(); - final int size = size(); - final int[] indices = comp == null ? null : sort(comp); - return new BytesRefIterator() { - int pos = 0; + + return new IndexedBytesRefIterator() { + int pos = -1; + int ord = 0; @Override public BytesRef next() { + ++pos; if (pos < size) { - setBytesRef(spare, result, indices == null ? pos++ : indices[pos++]); + ord = indices == null ? pos : indices[pos]; + setBytesRef(spare, result, ord); return result; } return null; } + + @Override + public int ord() { + return ord; + } }; } + + /** + * Used to iterate the elements of an array in a given order. + */ + public final static class SortState implements Accountable { + private final int[] indices; + + private SortState(int[] indices) { + this.indices = indices; + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + indices.length * Integer.BYTES; + } + } + + /** + * An extension of {@link BytesRefIterator} that allows retrieving the index of the current element + */ + public interface IndexedBytesRefIterator extends BytesRefIterator { + /** + * Returns the ordinal position of the element that was returned in the latest call of {@link #next()}. + * Do not call this method if {@link #next()} is not called yet or the last call returned a null value. + */ + int ord(); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java b/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java index a63c107ce70..2d74b7338ac 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java @@ -20,7 +20,10 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.lucene.util.BytesRef; @@ -46,6 +49,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { assertTrue(buffer.isNumeric()); assertEquals(13, buffer.getMaxNumeric()); assertEquals(6, buffer.getMinNumeric()); + buffer.finish(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); FieldUpdatesBuffer.BufferedUpdate value = iterator.next(); assertNotNull(value); @@ -99,6 +103,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE); } buffer.addUpdate(new Term("id", "4"), intValue, Integer.MAX_VALUE); + buffer.finish(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); FieldUpdatesBuffer.BufferedUpdate value; int count = 0; @@ -131,6 +136,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE); } buffer.addUpdate(new Term("id", "4"), new BytesRef(""), Integer.MAX_VALUE); + buffer.finish(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); FieldUpdatesBuffer.BufferedUpdate value; int count = 0; @@ -149,12 +155,20 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { assertFalse(buffer.isNumeric()); } + int randomDocUpTo() { + if (random().nextInt(5) == 0) { + return Integer.MAX_VALUE; + } else { + return random().nextInt(10000); + } + } + DocValuesUpdate.BinaryDocValuesUpdate getRandomBinaryUpdate() { String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field")); String docId = "" + random().nextInt(10); - DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary", + DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary", rarely() ? null : new BytesRef(TestUtil.randomRealisticUnicodeString(random()))); - return rarely() ? value.prepareForApply(random().nextInt(100)) : value; + return rarely() ? value.prepareForApply(randomDocUpTo()) : value; } DocValuesUpdate.NumericDocValuesUpdate getRandomNumericUpdate() { @@ -162,7 +176,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { String docId = "" + random().nextInt(10); DocValuesUpdate.NumericDocValuesUpdate value = new DocValuesUpdate.NumericDocValuesUpdate(new Term(termField, docId), "numeric", rarely() ? null : Long.valueOf(random().nextInt(100))); - return rarely() ? value.prepareForApply(random().nextInt(100)) : value; + return rarely() ? value.prepareForApply(randomDocUpTo()) : value; } public void testBinaryRandom() throws IOException { @@ -181,6 +195,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto); } } + buffer.finish(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); FieldUpdatesBuffer.BufferedUpdate value; @@ -216,6 +231,55 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto); } } + buffer.finish(); + DocValuesUpdate.NumericDocValuesUpdate lastUpdate = randomUpdate; + boolean termsSorted = lastUpdate.hasValue && updates.stream() + .allMatch(update -> update.field.equals(lastUpdate.field) && + update.hasValue && update.getValue() == lastUpdate.getValue()); + assertBufferUpdates(buffer, updates, termsSorted); + } + + public void testNoNumericValue() { + DocValuesUpdate.NumericDocValuesUpdate update = + new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", null); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(Counter.newCounter(), update, update.docIDUpto); + assertEquals(0, buffer.getMinNumeric()); + assertEquals(0, buffer.getMaxNumeric()); + } + + public void testSortAndDedupNumericUpdatesByTerms() throws IOException { + List updates = new ArrayList<>(); + int numUpdates = 1 + random().nextInt(1000); + Counter counter = Counter.newCounter(); + String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field")); + long docValue = 1 + random().nextInt(1000); + DocValuesUpdate.NumericDocValuesUpdate randomUpdate = new DocValuesUpdate.NumericDocValuesUpdate( + new Term(termField, Integer.toString(random().nextInt(1000))), "numeric", docValue); + randomUpdate = randomUpdate.prepareForApply(randomDocUpTo()); + updates.add(randomUpdate); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto); + for (int i = 0; i < numUpdates; i++) { + randomUpdate = new DocValuesUpdate.NumericDocValuesUpdate( + new Term(termField, Integer.toString(random().nextInt(1000))), "numeric", docValue); + randomUpdate = randomUpdate.prepareForApply(randomDocUpTo()); + updates.add(randomUpdate); + buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto); + } + buffer.finish(); + assertBufferUpdates(buffer, updates, true); + } + + void assertBufferUpdates(FieldUpdatesBuffer buffer, + List updates, + boolean termSorted) throws IOException { + if (termSorted) { + updates.sort(Comparator.comparing(u -> u.term.bytes)); + SortedMap byTerms = new TreeMap<>(); + for (DocValuesUpdate.NumericDocValuesUpdate update : updates) { + byTerms.compute(update.term.bytes, (k, v) -> v != null && v.docIDUpto >= update.docIDUpto ? v : update); + } + updates = new ArrayList<>(byTerms.values()); + } FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); FieldUpdatesBuffer.BufferedUpdate value; @@ -223,14 +287,15 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; boolean hasAtLeastOneValue = false; + DocValuesUpdate.NumericDocValuesUpdate expectedUpdate; while ((value = iterator.next()) != null) { long v = buffer.getNumericValue(count); - randomUpdate = updates.get(count++); - assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString()); - assertEquals(randomUpdate.term.field, value.termField); - assertEquals(randomUpdate.hasValue, value.hasValue); - if (randomUpdate.hasValue) { - assertEquals(randomUpdate.getValue(), value.numericValue); + expectedUpdate = updates.get(count++); + assertEquals(expectedUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString()); + assertEquals(expectedUpdate.term.field, value.termField); + assertEquals(expectedUpdate.hasValue, value.hasValue); + if (expectedUpdate.hasValue) { + assertEquals(expectedUpdate.getValue(), value.numericValue); assertEquals(v, value.numericValue); min = Math.min(min, v); max = Math.max(max, v); @@ -239,7 +304,7 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { assertEquals(0, value.numericValue); assertEquals(0, v); } - assertEquals(randomUpdate.docIDUpto, value.docUpTo); + assertEquals(expectedUpdate.docIDUpto, value.docUpTo); } if (hasAtLeastOneValue) { assertEquals(max, buffer.getMaxNumeric()); @@ -250,12 +315,4 @@ public class TestFieldUpdatesBuffer extends LuceneTestCase { } assertEquals(count, updates.size()); } - - public void testNoNumericValue() { - DocValuesUpdate.NumericDocValuesUpdate update = - new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", null); - FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(Counter.newCounter(), update, update.docIDUpto); - assertEquals(0, buffer.getMinNumeric()); - assertEquals(0, buffer.getMaxNumeric()); - } }