From b0e6a92ede321dc6932348978bed1a4c04a70d8c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Dec 2018 17:36:46 +0100 Subject: [PATCH] LUCENE-8590: Optimize DocValues update datastructures Today we are using a LinkedHashMap to buffer doc-values updates in BufferedUpdates. This on the one hand uses an Object based datastructure and on the other requires re-encoding the data into a more compact representation once the BufferedUpdates are frozen. This change uses a more compact represenation for the updates already in the BufferedUpdates in a parallel-array like datastructure that can be reused in FrozenBufferedDeletes. It also adds an much simpler to use API to consume the updates and allows for internal memory optimization for common case updates. --- lucene/CHANGES.txt | 4 + .../apache/lucene/index/BufferedUpdates.java | 171 +++------- .../lucene/index/BufferedUpdatesStream.java | 1 + .../index/DocumentsWriterDeleteQueue.java | 2 +- .../index/DocumentsWriterPerThread.java | 7 +- .../lucene/index/FieldUpdatesBuffer.java | 280 ++++++++++++++++ .../lucene/index/FrozenBufferedUpdates.java | 302 ++++++------------ .../lucene/index/TestFieldUpdatesBuffer.java | 231 ++++++++++++++ 8 files changed, 657 insertions(+), 341 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java create mode 100644 lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 3c2a4092fc8..51b831d8a6b 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -260,6 +260,10 @@ Optimizations * LUCENE-8552: FieldInfos.getMergedFieldInfos no longer does any merging if there is <= 1 segment. (Christophe Bismuth via David Smiley) +* LUCENE-8590: BufferedUpdates now uses an optimized storage for buffering docvalues updates that + can safe up to 80% of the heap used compared to the previous implementation and uses non-object + based datastructures. (Simon Willnauer, Mike McCandless, Shai Erera, Adrien Grant) + Other * LUCENE-8573: BKDWriter now uses FutureArrays#mismatch to compute shared prefixes. diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java index ae37f149fbd..04b19b779be 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java @@ -19,16 +19,15 @@ package org.apache.lucene.index; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.IntFunction; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.Counter; import org.apache.lucene.util.RamUsageEstimator; /** Holds buffered deletes and updates, by docID, term or query for a @@ -42,7 +41,7 @@ import org.apache.lucene.util.RamUsageEstimator; // instance on DocumentWriterPerThread, or via sync'd code by // DocumentsWriterDeleteQueue -class BufferedUpdates { +class BufferedUpdates implements Accountable { /* Rough logic: HashMap has an array[Entry] w/ varying load factor (say 2 * POINTER). Entry is object w/ Term @@ -67,94 +66,20 @@ class BufferedUpdates { (OBJ_HEADER + 3*POINTER + INT). Query we often undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*Integer.BYTES + 24; - - /* Rough logic: NumericUpdate calculates its actual size, - * including the update Term and DV field (String). The - * per-field map holds a reference to the updated field, and - * therefore we only account for the object reference and - * map space itself. This is incremented when we first see - * an updated field. - * - * HashMap has an array[Entry] w/ varying load - * factor (say 2*POINTER). Entry is an object w/ String key, - * LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). - * - * LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT, - * Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT) - */ - final static int BYTES_PER_NUMERIC_FIELD_ENTRY = - 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + - RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*Integer.BYTES + Float.BYTES; - - /* Rough logic: Incremented when we see another Term for an already updated - * field. - * LinkedHashMap has an array[Entry] w/ varying load factor - * (say 2*POINTER). Entry is an object w/ Term key, NumericUpdate val, - * int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT). - * - * Term (key) is counted only as POINTER. - * NumericUpdate (val) counts its own size and isn't accounted for here. - */ - final static int BYTES_PER_NUMERIC_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Integer.BYTES; - - /* Rough logic: BinaryUpdate calculates its actual size, - * including the update Term and DV field (String). The - * per-field map holds a reference to the updated field, and - * therefore we only account for the object reference and - * map space itself. This is incremented when we first see - * an updated field. - * - * HashMap has an array[Entry] w/ varying load - * factor (say 2*POINTER). Entry is an object w/ String key, - * LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). - * - * LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT, - * Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT) - */ - final static int BYTES_PER_BINARY_FIELD_ENTRY = - 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + - RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*Integer.BYTES + Float.BYTES; - - /* Rough logic: Incremented when we see another Term for an already updated - * field. - * LinkedHashMap has an array[Entry] w/ varying load factor - * (say 2*POINTER). Entry is an object w/ Term key, BinaryUpdate val, - * int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT). - * - * Term (key) is counted only as POINTER. - * BinaryUpdate (val) counts its own size and isn't accounted for here. - */ - final static int BYTES_PER_BINARY_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Integer.BYTES; - final AtomicInteger numTermDeletes = new AtomicInteger(); - final AtomicInteger numNumericUpdates = new AtomicInteger(); - final AtomicInteger numBinaryUpdates = new AtomicInteger(); + final AtomicInteger numFieldUpdates = new AtomicInteger(); final Map deleteTerms = new HashMap<>(); final Map deleteQueries = new HashMap<>(); final List deleteDocIDs = new ArrayList<>(); - // Map> - // For each field we keep an ordered list of NumericUpdates, key'd by the - // update Term. LinkedHashMap guarantees we will later traverse the map in - // insertion order (so that if two terms affect the same document, the last - // one that came in wins), and helps us detect faster if the same Term is - // used to update the same field multiple times (so we later traverse it - // only once). - final Map> numericUpdates = new HashMap<>(); + final Map fieldUpdates = new HashMap<>(); - // Map> - // For each field we keep an ordered list of BinaryUpdates, key'd by the - // update Term. LinkedHashMap guarantees we will later traverse the map in - // insertion order (so that if two terms affect the same document, the last - // one that came in wins), and helps us detect faster if the same Term is - // used to update the same field multiple times (so we later traverse it - // only once). - final Map> binaryUpdates = new HashMap<>(); public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); - final AtomicLong bytesUsed; + private final Counter bytesUsed = Counter.newCounter(true); + final Counter fieldUpdatesBytesUsed = Counter.newCounter(true); private final static boolean VERBOSE_DELETES = false; @@ -163,7 +88,6 @@ class BufferedUpdates { final String segmentName; public BufferedUpdates(String segmentName) { - this.bytesUsed = new AtomicLong(); this.segmentName = segmentName; } @@ -171,8 +95,8 @@ class BufferedUpdates { public String toString() { if (VERBOSE_DELETES) { return "gen=" + gen + " numTerms=" + numTermDeletes + ", deleteTerms=" + deleteTerms - + ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", numericUpdates=" + numericUpdates - + ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed; + + ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", fieldUpdates=" + fieldUpdates + + ", bytesUsed=" + bytesUsed; } else { String s = "gen=" + gen; if (numTermDeletes.get() != 0) { @@ -184,11 +108,8 @@ class BufferedUpdates { if (deleteDocIDs.size() != 0) { s += " " + deleteDocIDs.size() + " deleted docIDs"; } - if (numNumericUpdates.get() != 0) { - s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")"; - } - if (numBinaryUpdates.get() != 0) { - s += " " + numBinaryUpdates.get() + " binary updates (unique count=" + binaryUpdates.size() + ")"; + if (numFieldUpdates.get() != 0) { + s += " " + numFieldUpdates.get() + " field updates (unique count=" + fieldUpdates.size() + ")"; } if (bytesUsed.get() != 0) { s += " bytesUsed=" + bytesUsed.get(); @@ -235,48 +156,23 @@ class BufferedUpdates { } void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) { - if (addDocValuesUpdate(numericUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_NUMERIC_UPDATE_ENTRY, - BYTES_PER_NUMERIC_FIELD_ENTRY)) { - numNumericUpdates.incrementAndGet(); + FieldUpdatesBuffer buffer = fieldUpdates.computeIfAbsent(update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto)); + if (update.hasValue) { + buffer.addUpdate(update.term, update.getValue(), docIDUpto); + } else { + buffer.addNoValue(update.term, docIDUpto); } + numFieldUpdates.incrementAndGet(); } void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) { - if (addDocValuesUpdate(binaryUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_BINARY_UPDATE_ENTRY, - BYTES_PER_BINARY_FIELD_ENTRY)) { - numBinaryUpdates.incrementAndGet(); + FieldUpdatesBuffer buffer = fieldUpdates.computeIfAbsent(update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto)); + if (update.hasValue) { + buffer.addUpdate(update.term, update.getValue(), docIDUpto); + } else { + buffer.addNoValue(update.term, docIDUpto); } - } - - private boolean addDocValuesUpdate(Map> updates, T update, - int docIDUpto, IntFunction prepareForApply, - long bytesPerUpdateEntry, long bytesPerFieldEntry) { - LinkedHashMap fieldUpdates = updates.get(update.field); - if (fieldUpdates == null) { - fieldUpdates = new LinkedHashMap<>(); - updates.put(update.field, fieldUpdates); - bytesUsed.addAndGet(bytesPerFieldEntry); - } - final T current = fieldUpdates.get(update.term); - if (current != null && docIDUpto < current.docIDUpto) { - // Only record the new number if it's greater than or equal to the current - // one. This is important because if multiple threads are replacing the - // same doc at nearly the same time, it's possible that one thread that - // got a higher docID is scheduled before the other threads. - return false; - } - - // since it's a LinkedHashMap, we must first remove the Term entry so that - // it's added last (we're interested in insertion-order). - if (current != null) { - fieldUpdates.remove(update.term); - } - - fieldUpdates.put(update.term, prepareForApply.apply(docIDUpto)); // only make a copy if necessary - if (current == null) { - bytesUsed.addAndGet(bytesPerUpdateEntry + update.sizeInBytes()); - } - return true; + numFieldUpdates.incrementAndGet(); } void clearDeleteTerms() { @@ -288,15 +184,24 @@ class BufferedUpdates { deleteTerms.clear(); deleteQueries.clear(); deleteDocIDs.clear(); - numericUpdates.clear(); - binaryUpdates.clear(); numTermDeletes.set(0); - numNumericUpdates.set(0); - numBinaryUpdates.set(0); - bytesUsed.set(0); + numFieldUpdates.set(0); + fieldUpdates.clear(); + bytesUsed.addAndGet(-bytesUsed.get()); + fieldUpdatesBytesUsed.addAndGet(-fieldUpdatesBytesUsed.get()); } boolean any() { - return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0; + return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numFieldUpdates.get() > 0; + } + + @Override + public long ramBytesUsed() { + return bytesUsed.get() + fieldUpdatesBytesUsed.get(); + } + + void clearDeletedDocIds() { + deleteDocIDs.clear(); + bytesUsed.addAndGet(-deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index 9a669e0d5b9..a76d6243546 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -345,4 +345,5 @@ final class BufferedUpdatesStream implements Accountable { } } } + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index 0db043abd03..e940d9efba2 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -469,7 +469,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { @Override public long ramBytesUsed() { - return globalBufferedUpdates.bytesUsed.get(); + return globalBufferedUpdates.ramBytesUsed(); } @Override diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index c8ebc4db491..22b8e3a7f11 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -443,8 +443,7 @@ final class DocumentsWriterPerThread { flushState.liveDocs.clear(delDocID); } flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size(); - pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID); - pendingUpdates.deleteDocIDs.clear(); + pendingUpdates.clearDeletedDocIds(); } if (aborted) { @@ -493,7 +492,7 @@ final class DocumentsWriterPerThread { } final BufferedUpdates segmentDeletes; - if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) { + if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numFieldUpdates.get() == 0) { pendingUpdates.clear(); segmentDeletes = null; } else { @@ -636,7 +635,7 @@ final class DocumentsWriterPerThread { } long bytesUsed() { - return bytesUsed.get() + pendingUpdates.bytesUsed.get(); + return bytesUsed.get() + pendingUpdates.ramBytesUsed(); } /* Initial chunks size of the shared byte[] blocks used to diff --git a/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java b/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java new file mode 100644 index 00000000000..acfa88e4b1f --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/FieldUpdatesBuffer.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefArray; +import org.apache.lucene.util.BytesRefIterator; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.RamUsageEstimator; + +/** + * This class efficiently buffers numeric and binary field updates and stores + * terms, values and metadata in a memory efficient way without creating large amounts + * of objects. Update terms are stored without de-duplicating the update term. + * In general we try to optimize for several use-cases. For instance we try to use constant + * space for update terms field since the common case always updates on the same field. Also for docUpTo + * we try to optimize for the case when updates should be applied to all docs ie. docUpTo=Integer.MAX_VALUE. + * In other cases each update will likely have a different docUpTo. + * Along the same lines this impl optimizes the case when all updates have a value. Lastly, if all updates share the + * same value for a numeric field we only store the value once. + */ +final class FieldUpdatesBuffer { + private static final long SELF_SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(FieldUpdatesBuffer.class); + private static final long STRING_SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(String.class); + private final Counter bytesUsed; + private int numUpdates = 1; + // we use a very simple approach and store the update term values without de-duplication + // which is also not a common case to keep updating the same value more than once... + // we might pay a higher price in terms of memory in certain cases but will gain + // on CPU for those. We also save on not needing to sort in order to apply the terms in order + // since by definition we store them in order. + private final BytesRefArray termValues; + private final BytesRefArray byteValues; // this will be null if we are buffering numerics + private int[] docsUpTo; + private long[] numericValues; // this will be null if we are buffering binaries + private FixedBitSet hasValues; + private String[] fields; + private final boolean isNumeric; + + private FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) { + this.bytesUsed = bytesUsed; + this.bytesUsed.addAndGet(SELF_SHALLOW_SIZE); + termValues = new BytesRefArray(bytesUsed); + termValues.append(initialValue.term.bytes); + fields = new String[] {initialValue.term.field}; + bytesUsed.addAndGet(sizeOfString(initialValue.term.field)); + docsUpTo = new int[] {docUpTo}; + if (initialValue.hasValue == false) { + hasValues = new FixedBitSet(1); + bytesUsed.addAndGet(hasValues.ramBytesUsed()); + } + this.isNumeric = isNumeric; + byteValues = isNumeric ? null : new BytesRefArray(bytesUsed); + } + + private static long sizeOfString(String string) { + return STRING_SHALLOW_SIZE + (string.length() * Character.BYTES); + } + + FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate.NumericDocValuesUpdate initialValue, int docUpTo) { + this(bytesUsed, initialValue, docUpTo, true); + if (initialValue.hasValue()) { + numericValues = new long[] {initialValue.getValue()}; + } else { + numericValues = new long[] {0}; + } + bytesUsed.addAndGet(Long.BYTES); + } + + FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate.BinaryDocValuesUpdate initialValue, int docUpTo) { + this(bytesUsed, initialValue, docUpTo, false); + if (initialValue.hasValue()) { + byteValues.append(initialValue.getValue()); + } + } + + void add(String field, int docUpTo, int ord, boolean hasValue) { + if (fields[0].equals(field) == false || fields.length != 1 ) { + if (fields.length <= ord) { + String[] array = ArrayUtil.grow(fields, ord+1); + if (fields.length == 1) { + Arrays.fill(array, 1, ord, fields[0]); + } + bytesUsed.addAndGet((array.length - fields.length) * RamUsageEstimator.NUM_BYTES_OBJECT_REF); + fields = array; + } + if (field != fields[0]) { // that's an easy win of not accounting if there is an outlier + bytesUsed.addAndGet(sizeOfString(field)); + } + fields[ord] = field; + } + + if (docsUpTo[0] != docUpTo || docsUpTo.length != 1) { + if (docsUpTo.length <= ord) { + int[] array = ArrayUtil.grow(docsUpTo, ord+1); + if (docsUpTo.length == 1) { + Arrays.fill(array, 1, ord, docsUpTo[0]); + } + bytesUsed.addAndGet((array.length-docsUpTo.length) * Integer.BYTES); + docsUpTo = array; + } + docsUpTo[ord] = docUpTo; + } + + if (hasValue == false || hasValues != null) { + if (hasValues == null) { + hasValues = new FixedBitSet(ord+1); + hasValues.set(0, ord); + bytesUsed.addAndGet(hasValues.ramBytesUsed()); + } else if (hasValues.length() <= ord) { + FixedBitSet fixedBitSet = FixedBitSet.ensureCapacity(hasValues, ArrayUtil.oversize(ord + 1, 1)); + bytesUsed.addAndGet(fixedBitSet.ramBytesUsed()-hasValues.ramBytesUsed()); + hasValues = fixedBitSet; + } + if (hasValue) { + hasValues.set(ord); + } + } + } + + void addUpdate(Term term, long value, int docUpTo) { + assert isNumeric; + final int ord = append(term); + String field = term.field; + add(field, docUpTo, ord, true); + if (numericValues[0] != value || numericValues.length != 1) { + if (numericValues.length <= ord) { + long[] array = ArrayUtil.grow(numericValues, ord+1); + if (numericValues.length == 1) { + Arrays.fill(array, 1, ord, numericValues[0]); + } + bytesUsed.addAndGet((array.length-numericValues.length) * Long.BYTES); + numericValues = array; + } + numericValues[ord] = value; + } + } + + void addNoValue(Term term, int docUpTo) { + final int ord = append(term); + add(term.field, docUpTo, ord, false); + } + + void addUpdate(Term term, BytesRef value, int docUpTo) { + assert isNumeric == false; + final int ord = append(term); + byteValues.append(value); + add(term.field, docUpTo, ord, true); + } + + private int append(Term term) { + termValues.append(term.bytes); + return numUpdates++; + } + + BufferedUpdateIterator iterator() { + return new BufferedUpdateIterator(); + } + + boolean isNumeric() { + assert isNumeric || byteValues != null; + return isNumeric; + } + + /** + * Struct like class that is used to iterate over all updates in this buffer + */ + static class BufferedUpdate { + + private BufferedUpdate() {}; + /** + * the max document ID this update should be applied to + */ + int docUpTo; + /** + * a numeric value or 0 if this buffer holds binary updates + */ + long numericValue; + /** + * a binary value or null if this buffer holds numeric updates + */ + BytesRef binaryValue; + /** + * true if this update has a value + */ + boolean hasValue; + /** + * The update terms field. This will never be null. + */ + String termField; + /** + * The update terms value. This will never be null. + */ + BytesRef termValue; + + @Override + public int hashCode() { + throw new UnsupportedOperationException( + "this struct should not be use in map or other data-stuctures that use hashCode / equals"); + } + + @Override + public boolean equals(Object obj) { + throw new UnsupportedOperationException( + "this struct should not be use in map or other data-stuctures that use hashCode / equals"); + } + } + + /** + * An iterator that iterates over all updates in insertion order + */ + class BufferedUpdateIterator { + private final BytesRefIterator termValuesIterator; + private final BytesRefIterator byteValuesIterator; + private final BufferedUpdate bufferedUpdate = new BufferedUpdate(); + private final Bits updatesWithValue; + private int index = 0; + + BufferedUpdateIterator() { + this.termValuesIterator = termValues.iterator(); + this.byteValuesIterator = isNumeric ? null : byteValues.iterator(); + updatesWithValue = hasValues == null ? new Bits.MatchAllBits(numUpdates) : hasValues; + } + + /** + * Moves to the next BufferedUpdate or return null if all updates are consumed. + * The returned instance is a shared instance and must be fully consumed before the next call to this method. + */ + BufferedUpdate next() throws IOException { + BytesRef next = termValuesIterator.next(); + if (next != null) { + final int idx = index++; + bufferedUpdate.termValue = next; + bufferedUpdate.hasValue = updatesWithValue.get(idx); + bufferedUpdate.termField = fields[getArrayIndex(fields.length, idx)]; + bufferedUpdate.docUpTo = docsUpTo[getArrayIndex(docsUpTo.length, idx)]; + if (bufferedUpdate.hasValue) { + if (isNumeric) { + bufferedUpdate.numericValue = numericValues[getArrayIndex(numericValues.length, idx)]; + bufferedUpdate.binaryValue = null; + } else { + bufferedUpdate.binaryValue = byteValuesIterator.next(); + } + } else { + bufferedUpdate.binaryValue = null; + bufferedUpdate.numericValue = 0; + } + return bufferedUpdate; + } else { + return null; + } + } + } + + private static int getArrayIndex(int arrayLength, int index) { + assert arrayLength == 1 || arrayLength > index : "illegal array index length: " + arrayLength + " index: " + index; + return Math.min(arrayLength-1, index); + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 774389656ce..266db310057 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -33,20 +32,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; -import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; -import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; -import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.RAMOutputStream; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.RamUsageEstimator; @@ -73,21 +67,14 @@ final class FrozenBufferedUpdates { final Query[] deleteQueries; final int[] deleteQueryLimits; - // numeric DV update term and their updates - final byte[] numericDVUpdates; - - // binary DV update term and their updates - final byte[] binaryDVUpdates; - - private final int numericDVUpdateCount; - private final int binaryDVUpdateCount; - /** Counts down once all deletes/updates have been applied */ public final CountDownLatch applied = new CountDownLatch(1); private final ReentrantLock applyLock = new ReentrantLock(); + private final Map fieldUpdates; /** How many total documents were deleted/updated. */ public long totalDelCount; + private final int fieldUpdatesCount; final int bytesUsed; final int numTermDeletes; @@ -120,80 +107,25 @@ final class FrozenBufferedUpdates { deleteQueryLimits[upto] = ent.getValue(); upto++; } - Counter counter = Counter.newCounter(); // TODO if a Term affects multiple fields, we could keep the updates key'd by Term // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. - numericDVUpdates = freezeDVUpdates(updates.numericUpdates, counter::addAndGet); - numericDVUpdateCount = (int)counter.get(); - counter.addAndGet(-counter.get()); - assert counter.get() == 0; - // TODO if a Term affects multiple fields, we could keep the updates key'd by Term - // so that it maps to all fields it affects, sorted by their docUpto, and traverse - // that Term only once, applying the update to all fields that still need to be - // updated. - binaryDVUpdates = freezeDVUpdates(updates.binaryUpdates, counter::addAndGet); - binaryDVUpdateCount = (int)counter.get(); + this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates)); + this.fieldUpdatesCount = updates.numFieldUpdates.get(); - bytesUsed = (int) (deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY - + numericDVUpdates.length + binaryDVUpdates.length); + bytesUsed = (int) ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY) + + updates.fieldUpdatesBytesUsed.get()); numTermDeletes = updates.numTermDeletes.get(); if (infoStream != null && infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "compressed %d to %d bytes (%.2f%%) for deletes/updates; private segment %s", - updates.bytesUsed.get(), bytesUsed, 100.*bytesUsed/updates.bytesUsed.get(), + updates.ramBytesUsed(), bytesUsed, 100.*bytesUsed/updates.ramBytesUsed(), privateSegment)); } } - private static byte[] freezeDVUpdates(Map> dvUpdates, - IntConsumer updateSizeConsumer) - throws IOException { - // TODO: we could do better here, e.g. collate the updates by field - // so if you are updating 2 fields interleaved we don't keep writing the field strings - try (RAMOutputStream out = new RAMOutputStream()) { - String lastTermField = null; - String lastUpdateField = null; - for (LinkedHashMap updates : dvUpdates.values()) { - updateSizeConsumer.accept(updates.size()); - for (T update : updates.values()) { - int code = update.term.bytes().length << 3; - - String termField = update.term.field(); - if (termField.equals(lastTermField) == false) { - code |= 1; - } - String updateField = update.field; - if (updateField.equals(lastUpdateField) == false) { - code |= 2; - } - if (update.hasValue()) { - code |= 4; - } - out.writeVInt(code); - out.writeVInt(update.docIDUpto); - if (termField.equals(lastTermField) == false) { - out.writeString(termField); - lastTermField = termField; - } - if (updateField.equals(lastUpdateField) == false) { - out.writeString(updateField); - lastUpdateField = updateField; - } - out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); - if (update.hasValue()) { - update.writeTo(out); - } - } - } - byte[] bytes = new byte[(int) out.getFilePointer()]; - out.writeTo(bytes, 0); - return bytes; - } - } - /** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null * if the private segment was already merged away. */ private List getInfosToApply(IndexWriter writer) { @@ -491,7 +423,7 @@ final class FrozenBufferedUpdates { private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { - if (numericDVUpdates.length == 0 && binaryDVUpdates.length == 0) { + if (fieldUpdates.isEmpty()) { return 0; } @@ -513,33 +445,30 @@ final class FrozenBufferedUpdates { continue; } final boolean isSegmentPrivateDeletes = privateSegment != null; - if (numericDVUpdates.length > 0) { - updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true, delGen, isSegmentPrivateDeletes); + if (fieldUpdates.isEmpty() == false) { + updateCount += applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes); } - if (binaryDVUpdates.length > 0) { - updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false, delGen, isSegmentPrivateDeletes); - } } if (infoStream.isEnabled("BD")) { infoStream.message("BD", - String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d numeric updates and %d binary updates; %d new updates", + String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d field updates; %d new updates", (System.nanoTime()-startNS)/1000000., segStates.length, - numericDVUpdateCount, - binaryDVUpdateCount, + fieldUpdatesCount, updateCount)); } return updateCount; } - private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, byte[] updates, - boolean isNumeric, long delGen, + private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, + Map updates, + long delGen, boolean segmentPrivateDeletes) throws IOException { - TermsEnum termsEnum = null; + TermsEnum termsEnum; PostingsEnum postingsEnum = null; // TODO: we can process the updates per DV field, from last to first so that @@ -556,128 +485,98 @@ final class FrozenBufferedUpdates { // We first write all our updates private, and only in the end publish to the ReadersAndUpdates */ Map holder = new HashMap<>(); - - ByteArrayDataInput in = new ByteArrayDataInput(updates); - - String termField = null; - String updateField = null; - BytesRef term = new BytesRef(); - term.bytes = new byte[16]; - - BytesRef scratch = new BytesRef(); - scratch.bytes = new byte[16]; - - while (in.getPosition() != updates.length) { - int code = in.readVInt(); - int docIDUpto = in.readVInt(); - term.length = code >> 3; - - if ((code & 1) != 0) { - termField = in.readString(); - } - if ((code & 2) != 0) { - updateField = in.readString(); - } - boolean hasValue = (code & 4) != 0; - - if (term.bytes.length < term.length) { - term.bytes = ArrayUtil.grow(term.bytes, term.length); - } - in.readBytes(term.bytes, 0, term.length); - - final int limit; - if (delGen == segState.delGen) { - assert segmentPrivateDeletes; - limit = docIDUpto; - } else { - limit = Integer.MAX_VALUE; - } - - // TODO: we traverse the terms in update order (not term order) so that we - // apply the updates in the correct order, i.e. if two terms update the - // same document, the last one that came in wins, irrespective of the - // terms lexical order. - // we can apply the updates in terms order if we keep an updatesGen (and - // increment it with every update) and attach it to each NumericUpdate. Note - // that we cannot rely only on docIDUpto because an app may send two updates - // which will get same docIDUpto, yet will still need to respect the order - // those updates arrived. - - // TODO: we could at least *collate* by field? - - // This is the field used to resolve to docIDs, e.g. an "id" field, not the doc values field we are updating! - if ((code & 1) != 0) { - Terms terms = segState.reader.terms(termField); + for (Map.Entry fieldUpdate : updates.entrySet()) { + String updateField = fieldUpdate.getKey(); + FieldUpdatesBuffer value = fieldUpdate.getValue(); + boolean isNumeric = value.isNumeric(); + FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator(); + FieldUpdatesBuffer.BufferedUpdate bufferedUpdate; + while ((bufferedUpdate = iterator.next()) != null) { + Terms terms = segState.reader.terms(bufferedUpdate.termField); if (terms != null) { termsEnum = terms.iterator(); } else { - termsEnum = null; + // no terms in this segment for this field + continue; } - } - final BytesRef binaryValue; - final long longValue; - if (hasValue == false) { - longValue = -1; - binaryValue = null; - } else if (isNumeric) { - longValue = NumericDocValuesUpdate.readFrom(in); - binaryValue = null; - } else { - longValue = -1; - binaryValue = BinaryDocValuesUpdate.readFrom(in, scratch); - } - - if (termsEnum == null) { - // no terms in this segment for this field - continue; - } - - if (termsEnum.seekExact(term)) { - // we don't need term frequencies for this - postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); - DocValuesFieldUpdates dvUpdates = holder.get(updateField); - if (dvUpdates == null) { - if (isNumeric) { - dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); - } else { - dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); - } - holder.put(updateField, dvUpdates); - } - final IntConsumer docIdConsumer; - final DocValuesFieldUpdates update = dvUpdates; - if (hasValue == false) { - docIdConsumer = doc -> update.reset(doc); - } else if (isNumeric) { - docIdConsumer = doc -> update.add(doc, longValue); + final int limit; + if (delGen == segState.delGen) { + assert segmentPrivateDeletes; + limit = bufferedUpdate.docUpTo; } else { - docIdConsumer = doc -> update.add(doc, binaryValue); + limit = Integer.MAX_VALUE; } - final Bits acceptDocs = segState.rld.getLiveDocs(); - if (segState.rld.sortMap != null && segmentPrivateDeletes) { - // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: - int doc; - while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (acceptDocs == null || acceptDocs.get(doc)) { - // The limit is in the pre-sorted doc space: - if (segState.rld.sortMap.newToOld(doc) < limit) { + + // TODO: we traverse the terms in update order (not term order) so that we + // apply the updates in the correct order, i.e. if two terms update the + // same document, the last one that came in wins, irrespective of the + // terms lexical order. + // we can apply the updates in terms order if we keep an updatesGen (and + // increment it with every update) and attach it to each NumericUpdate. Note + // that we cannot rely only on docIDUpto because an app may send two updates + // which will get same docIDUpto, yet will still need to respect the order + // those updates arrived. + + // TODO: we could at least *collate* by field? + + + final BytesRef binaryValue; + final long longValue; + if (bufferedUpdate.hasValue == false) { + longValue = -1; + binaryValue = null; + } else { + longValue = bufferedUpdate.numericValue; + binaryValue = bufferedUpdate.binaryValue; + } + + if (termsEnum.seekExact(bufferedUpdate.termValue)) { + // we don't need term frequencies for this + postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); + DocValuesFieldUpdates dvUpdates = holder.get(updateField); + if (dvUpdates == null) { + if (isNumeric) { + dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); + } else { + dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); + } + holder.put(updateField, dvUpdates); + } + final IntConsumer docIdConsumer; + final DocValuesFieldUpdates update = dvUpdates; + if (bufferedUpdate.hasValue == false) { + docIdConsumer = doc -> update.reset(doc); + } else if (isNumeric) { + docIdConsumer = doc -> update.add(doc, longValue); + } else { + docIdConsumer = doc -> update.add(doc, binaryValue); + } + final Bits acceptDocs = segState.rld.getLiveDocs(); + if (segState.rld.sortMap != null && segmentPrivateDeletes) { + // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: + int doc; + while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (acceptDocs == null || acceptDocs.get(doc)) { + // The limit is in the pre-sorted doc space: + if (segState.rld.sortMap.newToOld(doc) < limit) { + docIdConsumer.accept(doc); + updateCount++; + } + } + } + } else { + int doc; + while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (doc >= limit) { + break; // no more docs that can be updated for this term + } + if (acceptDocs == null || acceptDocs.get(doc)) { docIdConsumer.accept(doc); updateCount++; } } } - } else { - int doc; - while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (doc >= limit) { - break; // no more docs that can be updated for this term - } - if (acceptDocs == null || acceptDocs.get(doc)) { - docIdConsumer.accept(doc); - updateCount++; - } - } } } } @@ -896,11 +795,8 @@ final class FrozenBufferedUpdates { if (deleteQueries.length != 0) { s += " numDeleteQueries=" + deleteQueries.length; } - if (numericDVUpdates.length > 0) { - s += " numNumericDVUpdates=" + numericDVUpdateCount; - } - if (binaryDVUpdates.length > 0) { - s += " numBinaryDVUpdates=" + binaryDVUpdateCount; + if (fieldUpdates.size() > 0) { + s += " fieldUpdates=" + fieldUpdatesCount; } if (bytesUsed != 0) { s += " bytesUsed=" + bytesUsed; @@ -913,6 +809,6 @@ final class FrozenBufferedUpdates { } boolean any() { - return deleteTerms.size() > 0 || deleteQueries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0; + return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ; } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java b/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java new file mode 100644 index 00000000000..ae4442fccd5 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestFieldUpdatesBuffer.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; + +public class TestFieldUpdatesBuffer extends LuceneTestCase { + + public void testBascis() throws IOException { + Counter counter = Counter.newCounter(); + DocValuesUpdate.NumericDocValuesUpdate update = + new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", 6); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, 15); + buffer.addUpdate(new Term("id", "10"), 6, 15); + buffer.addUpdate(new Term("id", "8"), 12, 15); + buffer.addUpdate(new Term("some_other_field", "8"), 13, 17); + buffer.addUpdate(new Term("id", "8"), 12, 16); + assertTrue(buffer.isNumeric()); + FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); + FieldUpdatesBuffer.BufferedUpdate value = iterator.next(); + assertNotNull(value); + assertEquals("id", value.termField); + assertEquals("1", value.termValue.utf8ToString()); + assertEquals(6, value.numericValue); + assertEquals(15, value.docUpTo); + + value = iterator.next(); + assertNotNull(value); + assertEquals("id", value.termField); + assertEquals("10", value.termValue.utf8ToString()); + assertEquals(6, value.numericValue); + assertEquals(15, value.docUpTo); + + value = iterator.next(); + assertNotNull(value); + assertEquals("id", value.termField); + assertEquals("8", value.termValue.utf8ToString()); + assertEquals(12, value.numericValue); + assertEquals(15, value.docUpTo); + + value = iterator.next(); + assertNotNull(value); + assertEquals("some_other_field", value.termField); + assertEquals("8", value.termValue.utf8ToString()); + assertEquals(13, value.numericValue); + assertEquals(17, value.docUpTo); + + value = iterator.next(); + assertNotNull(value); + assertEquals("id", value.termField); + assertEquals("8", value.termValue.utf8ToString()); + assertEquals(12, value.numericValue); + assertEquals(16, value.docUpTo); + assertNull(iterator.next()); + } + + public void testUpdateShareValues() throws IOException { + Counter counter = Counter.newCounter(); + int intValue = random().nextInt(); + boolean valueForThree = random().nextBoolean(); + DocValuesUpdate.NumericDocValuesUpdate update = + new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "0"), "enabled", intValue); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, Integer.MAX_VALUE); + buffer.addUpdate(new Term("id", "1"), intValue, Integer.MAX_VALUE); + buffer.addUpdate(new Term("id", "2"), intValue, Integer.MAX_VALUE); + if (valueForThree) { + buffer.addUpdate(new Term("id", "3"), intValue, Integer.MAX_VALUE); + } else { + buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE); + } + buffer.addUpdate(new Term("id", "4"), intValue, Integer.MAX_VALUE); + FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); + FieldUpdatesBuffer.BufferedUpdate value; + int count = 0; + while ((value = iterator.next()) != null) { + boolean hasValue = count != 3 || valueForThree; + assertEquals("" + (count++), value.termValue.utf8ToString()); + assertEquals("id", value.termField); + assertEquals(hasValue, value.hasValue); + if (hasValue) { + assertEquals(intValue, value.numericValue); + } else { + assertEquals(0, value.numericValue); + } + assertEquals(Integer.MAX_VALUE, value.docUpTo); + } + assertTrue(buffer.isNumeric()); + } + + public void testUpdateShareValuesBinary() throws IOException { + Counter counter = Counter.newCounter(); + boolean valueForThree = random().nextBoolean(); + DocValuesUpdate.BinaryDocValuesUpdate update = + new DocValuesUpdate.BinaryDocValuesUpdate(new Term("id", "0"), "enabled", new BytesRef("")); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, Integer.MAX_VALUE); + buffer.addUpdate(new Term("id", "1"), new BytesRef(""), Integer.MAX_VALUE); + buffer.addUpdate(new Term("id", "2"), new BytesRef(""), Integer.MAX_VALUE); + if (valueForThree) { + buffer.addUpdate(new Term("id", "3"), new BytesRef(""), Integer.MAX_VALUE); + } else { + buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE); + } + buffer.addUpdate(new Term("id", "4"), new BytesRef(""), Integer.MAX_VALUE); + FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); + FieldUpdatesBuffer.BufferedUpdate value; + int count = 0; + while ((value = iterator.next()) != null) { + boolean hasValue = count != 3 || valueForThree; + assertEquals("" + (count++), value.termValue.utf8ToString()); + assertEquals("id", value.termField); + assertEquals(hasValue, value.hasValue); + if (hasValue) { + assertEquals(new BytesRef(""), value.binaryValue); + } else { + assertNull(value.binaryValue); + } + assertEquals(Integer.MAX_VALUE, value.docUpTo); + } + assertFalse(buffer.isNumeric()); + } + + public T getRandomUpdate(boolean binary) { + String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field")); + String docId = "" + random().nextInt(10); + if (binary) { + DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary", + rarely() ? null : new BytesRef(TestUtil.randomRealisticUnicodeString(random()))); + return (T) (rarely() ? value.prepareForApply(random().nextInt(100)) : value); + } else { + DocValuesUpdate.NumericDocValuesUpdate value = new DocValuesUpdate.NumericDocValuesUpdate(new Term(termField, docId), "numeric", + rarely() ? null : Long.valueOf(random().nextInt(100))); + + return (T) (rarely() ? value.prepareForApply(random().nextInt(100)) : value); + } + } + + public void testBinaryRandom() throws IOException { + List updates = new ArrayList<>(); + int numUpdates = 1 + random().nextInt(1000); + Counter counter = Counter.newCounter(); + DocValuesUpdate.BinaryDocValuesUpdate randomUpdate = getRandomUpdate(true); + updates.add(randomUpdate); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto); + for (int i = 0; i < numUpdates; i++) { + randomUpdate = getRandomUpdate(true); + updates.add(randomUpdate); + if (randomUpdate.hasValue) { + buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto); + } else { + buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto); + } + } + FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); + FieldUpdatesBuffer.BufferedUpdate value; + + int count = 0; + while ((value = iterator.next()) != null) { + randomUpdate = updates.get(count++); + assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString()); + assertEquals(randomUpdate.term.field, value.termField); + assertEquals("count: " + count, randomUpdate.hasValue, value.hasValue); + if (randomUpdate.hasValue) { + assertEquals(randomUpdate.getValue(), value.binaryValue); + } else { + assertNull(value.binaryValue); + } + assertEquals(randomUpdate.docIDUpto, value.docUpTo); + } + assertEquals(count, updates.size()); + } + + public void testNumericRandom() throws IOException { + List updates = new ArrayList<>(); + int numUpdates = 1 + random().nextInt(1000); + Counter counter = Counter.newCounter(); + DocValuesUpdate.NumericDocValuesUpdate randomUpdate = getRandomUpdate(false); + updates.add(randomUpdate); + FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto); + for (int i = 0; i < numUpdates; i++) { + randomUpdate = getRandomUpdate(false); + updates.add(randomUpdate); + if (randomUpdate.hasValue) { + buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto); + } else { + buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto); + } + } + FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator(); + FieldUpdatesBuffer.BufferedUpdate value; + + int count = 0; + while ((value = iterator.next()) != null) { + randomUpdate = updates.get(count++); + assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString()); + assertEquals(randomUpdate.term.field, value.termField); + assertEquals(randomUpdate.hasValue, value.hasValue); + if (randomUpdate.hasValue) { + assertEquals(randomUpdate.getValue(), value.numericValue); + } else { + assertEquals(0, value.numericValue); + } + assertEquals(randomUpdate.docIDUpto, value.docUpTo); + } + assertEquals(count, updates.size()); + } + +}