LUCENE-8590: Optimize DocValues update datastructures

Today we are using a LinkedHashMap to buffer doc-values updates in
BufferedUpdates. This on the one hand uses an Object based datastructure
and on the other requires re-encoding the data into a more compact representation
once the BufferedUpdates are frozen. This change uses a more compact represenation
for the updates already in the BufferedUpdates in a parallel-array like datastructure
that can be reused in FrozenBufferedDeletes. It also adds an much simpler to use
API to consume the updates and allows for internal memory optimization for common
case updates.
This commit is contained in:
Simon Willnauer 2018-12-04 17:36:46 +01:00
parent c81822e157
commit b0e6a92ede
8 changed files with 657 additions and 341 deletions

View File

@ -260,6 +260,10 @@ Optimizations
* LUCENE-8552: FieldInfos.getMergedFieldInfos no longer does any merging if there is <= 1 segment.
(Christophe Bismuth via David Smiley)
* LUCENE-8590: BufferedUpdates now uses an optimized storage for buffering docvalues updates that
can safe up to 80% of the heap used compared to the previous implementation and uses non-object
based datastructures. (Simon Willnauer, Mike McCandless, Shai Erera, Adrien Grant)
Other
* LUCENE-8573: BKDWriter now uses FutureArrays#mismatch to compute shared prefixes.

View File

@ -19,16 +19,15 @@ package org.apache.lucene.index;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.RamUsageEstimator;
/** Holds buffered deletes and updates, by docID, term or query for a
@ -42,7 +41,7 @@ import org.apache.lucene.util.RamUsageEstimator;
// instance on DocumentWriterPerThread, or via sync'd code by
// DocumentsWriterDeleteQueue
class BufferedUpdates {
class BufferedUpdates implements Accountable {
/* Rough logic: HashMap has an array[Entry] w/ varying
load factor (say 2 * POINTER). Entry is object w/ Term
@ -67,94 +66,20 @@ class BufferedUpdates {
(OBJ_HEADER + 3*POINTER + INT). Query we often
undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*Integer.BYTES + 24;
/* Rough logic: NumericUpdate calculates its actual size,
* including the update Term and DV field (String). The
* per-field map holds a reference to the updated field, and
* therefore we only account for the object reference and
* map space itself. This is incremented when we first see
* an updated field.
*
* HashMap has an array[Entry] w/ varying load
* factor (say 2*POINTER). Entry is an object w/ String key,
* LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).
*
* LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT,
* Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT)
*/
final static int BYTES_PER_NUMERIC_FIELD_ENTRY =
7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*Integer.BYTES + Float.BYTES;
/* Rough logic: Incremented when we see another Term for an already updated
* field.
* LinkedHashMap has an array[Entry] w/ varying load factor
* (say 2*POINTER). Entry is an object w/ Term key, NumericUpdate val,
* int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT).
*
* Term (key) is counted only as POINTER.
* NumericUpdate (val) counts its own size and isn't accounted for here.
*/
final static int BYTES_PER_NUMERIC_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Integer.BYTES;
/* Rough logic: BinaryUpdate calculates its actual size,
* including the update Term and DV field (String). The
* per-field map holds a reference to the updated field, and
* therefore we only account for the object reference and
* map space itself. This is incremented when we first see
* an updated field.
*
* HashMap has an array[Entry] w/ varying load
* factor (say 2*POINTER). Entry is an object w/ String key,
* LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).
*
* LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT,
* Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT)
*/
final static int BYTES_PER_BINARY_FIELD_ENTRY =
7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*Integer.BYTES + Float.BYTES;
/* Rough logic: Incremented when we see another Term for an already updated
* field.
* LinkedHashMap has an array[Entry] w/ varying load factor
* (say 2*POINTER). Entry is an object w/ Term key, BinaryUpdate val,
* int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT).
*
* Term (key) is counted only as POINTER.
* BinaryUpdate (val) counts its own size and isn't accounted for here.
*/
final static int BYTES_PER_BINARY_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Integer.BYTES;
final AtomicInteger numTermDeletes = new AtomicInteger();
final AtomicInteger numNumericUpdates = new AtomicInteger();
final AtomicInteger numBinaryUpdates = new AtomicInteger();
final AtomicInteger numFieldUpdates = new AtomicInteger();
final Map<Term,Integer> deleteTerms = new HashMap<>();
final Map<Query,Integer> deleteQueries = new HashMap<>();
final List<Integer> deleteDocIDs = new ArrayList<>();
// Map<dvField,Map<updateTerm,NumericUpdate>>
// For each field we keep an ordered list of NumericUpdates, key'd by the
// update Term. LinkedHashMap guarantees we will later traverse the map in
// insertion order (so that if two terms affect the same document, the last
// one that came in wins), and helps us detect faster if the same Term is
// used to update the same field multiple times (so we later traverse it
// only once).
final Map<String,LinkedHashMap<Term,NumericDocValuesUpdate>> numericUpdates = new HashMap<>();
final Map<String,FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
// Map<dvField,Map<updateTerm,BinaryUpdate>>
// For each field we keep an ordered list of BinaryUpdates, key'd by the
// update Term. LinkedHashMap guarantees we will later traverse the map in
// insertion order (so that if two terms affect the same document, the last
// one that came in wins), and helps us detect faster if the same Term is
// used to update the same field multiple times (so we later traverse it
// only once).
final Map<String,LinkedHashMap<Term,BinaryDocValuesUpdate>> binaryUpdates = new HashMap<>();
public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
final AtomicLong bytesUsed;
private final Counter bytesUsed = Counter.newCounter(true);
final Counter fieldUpdatesBytesUsed = Counter.newCounter(true);
private final static boolean VERBOSE_DELETES = false;
@ -163,7 +88,6 @@ class BufferedUpdates {
final String segmentName;
public BufferedUpdates(String segmentName) {
this.bytesUsed = new AtomicLong();
this.segmentName = segmentName;
}
@ -171,8 +95,8 @@ class BufferedUpdates {
public String toString() {
if (VERBOSE_DELETES) {
return "gen=" + gen + " numTerms=" + numTermDeletes + ", deleteTerms=" + deleteTerms
+ ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", numericUpdates=" + numericUpdates
+ ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed;
+ ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", fieldUpdates=" + fieldUpdates
+ ", bytesUsed=" + bytesUsed;
} else {
String s = "gen=" + gen;
if (numTermDeletes.get() != 0) {
@ -184,11 +108,8 @@ class BufferedUpdates {
if (deleteDocIDs.size() != 0) {
s += " " + deleteDocIDs.size() + " deleted docIDs";
}
if (numNumericUpdates.get() != 0) {
s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")";
}
if (numBinaryUpdates.get() != 0) {
s += " " + numBinaryUpdates.get() + " binary updates (unique count=" + binaryUpdates.size() + ")";
if (numFieldUpdates.get() != 0) {
s += " " + numFieldUpdates.get() + " field updates (unique count=" + fieldUpdates.size() + ")";
}
if (bytesUsed.get() != 0) {
s += " bytesUsed=" + bytesUsed.get();
@ -235,48 +156,23 @@ class BufferedUpdates {
}
void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
if (addDocValuesUpdate(numericUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_NUMERIC_UPDATE_ENTRY,
BYTES_PER_NUMERIC_FIELD_ENTRY)) {
numNumericUpdates.incrementAndGet();
FieldUpdatesBuffer buffer = fieldUpdates.computeIfAbsent(update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
if (update.hasValue) {
buffer.addUpdate(update.term, update.getValue(), docIDUpto);
} else {
buffer.addNoValue(update.term, docIDUpto);
}
numFieldUpdates.incrementAndGet();
}
void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
if (addDocValuesUpdate(binaryUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_BINARY_UPDATE_ENTRY,
BYTES_PER_BINARY_FIELD_ENTRY)) {
numBinaryUpdates.incrementAndGet();
FieldUpdatesBuffer buffer = fieldUpdates.computeIfAbsent(update.field, k -> new FieldUpdatesBuffer(fieldUpdatesBytesUsed, update, docIDUpto));
if (update.hasValue) {
buffer.addUpdate(update.term, update.getValue(), docIDUpto);
} else {
buffer.addNoValue(update.term, docIDUpto);
}
}
private <T extends DocValuesUpdate> boolean addDocValuesUpdate(Map<String,LinkedHashMap<Term,T>> updates, T update,
int docIDUpto, IntFunction<T> prepareForApply,
long bytesPerUpdateEntry, long bytesPerFieldEntry) {
LinkedHashMap<Term,T> fieldUpdates = updates.get(update.field);
if (fieldUpdates == null) {
fieldUpdates = new LinkedHashMap<>();
updates.put(update.field, fieldUpdates);
bytesUsed.addAndGet(bytesPerFieldEntry);
}
final T current = fieldUpdates.get(update.term);
if (current != null && docIDUpto < current.docIDUpto) {
// Only record the new number if it's greater than or equal to the current
// one. This is important because if multiple threads are replacing the
// same doc at nearly the same time, it's possible that one thread that
// got a higher docID is scheduled before the other threads.
return false;
}
// since it's a LinkedHashMap, we must first remove the Term entry so that
// it's added last (we're interested in insertion-order).
if (current != null) {
fieldUpdates.remove(update.term);
}
fieldUpdates.put(update.term, prepareForApply.apply(docIDUpto)); // only make a copy if necessary
if (current == null) {
bytesUsed.addAndGet(bytesPerUpdateEntry + update.sizeInBytes());
}
return true;
numFieldUpdates.incrementAndGet();
}
void clearDeleteTerms() {
@ -288,15 +184,24 @@ class BufferedUpdates {
deleteTerms.clear();
deleteQueries.clear();
deleteDocIDs.clear();
numericUpdates.clear();
binaryUpdates.clear();
numTermDeletes.set(0);
numNumericUpdates.set(0);
numBinaryUpdates.set(0);
bytesUsed.set(0);
numFieldUpdates.set(0);
fieldUpdates.clear();
bytesUsed.addAndGet(-bytesUsed.get());
fieldUpdatesBytesUsed.addAndGet(-fieldUpdatesBytesUsed.get());
}
boolean any() {
return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numFieldUpdates.get() > 0;
}
@Override
public long ramBytesUsed() {
return bytesUsed.get() + fieldUpdatesBytesUsed.get();
}
void clearDeletedDocIds() {
deleteDocIDs.clear();
bytesUsed.addAndGet(-deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
}
}

View File

@ -345,4 +345,5 @@ final class BufferedUpdatesStream implements Accountable {
}
}
}
}

View File

@ -469,7 +469,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
@Override
public long ramBytesUsed() {
return globalBufferedUpdates.bytesUsed.get();
return globalBufferedUpdates.ramBytesUsed();
}
@Override

View File

@ -443,8 +443,7 @@ final class DocumentsWriterPerThread {
flushState.liveDocs.clear(delDocID);
}
flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size();
pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
pendingUpdates.deleteDocIDs.clear();
pendingUpdates.clearDeletedDocIds();
}
if (aborted) {
@ -493,7 +492,7 @@ final class DocumentsWriterPerThread {
}
final BufferedUpdates segmentDeletes;
if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numFieldUpdates.get() == 0) {
pendingUpdates.clear();
segmentDeletes = null;
} else {
@ -636,7 +635,7 @@ final class DocumentsWriterPerThread {
}
long bytesUsed() {
return bytesUsed.get() + pendingUpdates.bytesUsed.get();
return bytesUsed.get() + pendingUpdates.ramBytesUsed();
}
/* Initial chunks size of the shared byte[] blocks used to

View File

@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefArray;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
/**
* This class efficiently buffers numeric and binary field updates and stores
* terms, values and metadata in a memory efficient way without creating large amounts
* of objects. Update terms are stored without de-duplicating the update term.
* In general we try to optimize for several use-cases. For instance we try to use constant
* space for update terms field since the common case always updates on the same field. Also for docUpTo
* we try to optimize for the case when updates should be applied to all docs ie. docUpTo=Integer.MAX_VALUE.
* In other cases each update will likely have a different docUpTo.
* Along the same lines this impl optimizes the case when all updates have a value. Lastly, if all updates share the
* same value for a numeric field we only store the value once.
*/
final class FieldUpdatesBuffer {
private static final long SELF_SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(FieldUpdatesBuffer.class);
private static final long STRING_SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(String.class);
private final Counter bytesUsed;
private int numUpdates = 1;
// we use a very simple approach and store the update term values without de-duplication
// which is also not a common case to keep updating the same value more than once...
// we might pay a higher price in terms of memory in certain cases but will gain
// on CPU for those. We also save on not needing to sort in order to apply the terms in order
// since by definition we store them in order.
private final BytesRefArray termValues;
private final BytesRefArray byteValues; // this will be null if we are buffering numerics
private int[] docsUpTo;
private long[] numericValues; // this will be null if we are buffering binaries
private FixedBitSet hasValues;
private String[] fields;
private final boolean isNumeric;
private FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate initialValue, int docUpTo, boolean isNumeric) {
this.bytesUsed = bytesUsed;
this.bytesUsed.addAndGet(SELF_SHALLOW_SIZE);
termValues = new BytesRefArray(bytesUsed);
termValues.append(initialValue.term.bytes);
fields = new String[] {initialValue.term.field};
bytesUsed.addAndGet(sizeOfString(initialValue.term.field));
docsUpTo = new int[] {docUpTo};
if (initialValue.hasValue == false) {
hasValues = new FixedBitSet(1);
bytesUsed.addAndGet(hasValues.ramBytesUsed());
}
this.isNumeric = isNumeric;
byteValues = isNumeric ? null : new BytesRefArray(bytesUsed);
}
private static long sizeOfString(String string) {
return STRING_SHALLOW_SIZE + (string.length() * Character.BYTES);
}
FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate.NumericDocValuesUpdate initialValue, int docUpTo) {
this(bytesUsed, initialValue, docUpTo, true);
if (initialValue.hasValue()) {
numericValues = new long[] {initialValue.getValue()};
} else {
numericValues = new long[] {0};
}
bytesUsed.addAndGet(Long.BYTES);
}
FieldUpdatesBuffer(Counter bytesUsed, DocValuesUpdate.BinaryDocValuesUpdate initialValue, int docUpTo) {
this(bytesUsed, initialValue, docUpTo, false);
if (initialValue.hasValue()) {
byteValues.append(initialValue.getValue());
}
}
void add(String field, int docUpTo, int ord, boolean hasValue) {
if (fields[0].equals(field) == false || fields.length != 1 ) {
if (fields.length <= ord) {
String[] array = ArrayUtil.grow(fields, ord+1);
if (fields.length == 1) {
Arrays.fill(array, 1, ord, fields[0]);
}
bytesUsed.addAndGet((array.length - fields.length) * RamUsageEstimator.NUM_BYTES_OBJECT_REF);
fields = array;
}
if (field != fields[0]) { // that's an easy win of not accounting if there is an outlier
bytesUsed.addAndGet(sizeOfString(field));
}
fields[ord] = field;
}
if (docsUpTo[0] != docUpTo || docsUpTo.length != 1) {
if (docsUpTo.length <= ord) {
int[] array = ArrayUtil.grow(docsUpTo, ord+1);
if (docsUpTo.length == 1) {
Arrays.fill(array, 1, ord, docsUpTo[0]);
}
bytesUsed.addAndGet((array.length-docsUpTo.length) * Integer.BYTES);
docsUpTo = array;
}
docsUpTo[ord] = docUpTo;
}
if (hasValue == false || hasValues != null) {
if (hasValues == null) {
hasValues = new FixedBitSet(ord+1);
hasValues.set(0, ord);
bytesUsed.addAndGet(hasValues.ramBytesUsed());
} else if (hasValues.length() <= ord) {
FixedBitSet fixedBitSet = FixedBitSet.ensureCapacity(hasValues, ArrayUtil.oversize(ord + 1, 1));
bytesUsed.addAndGet(fixedBitSet.ramBytesUsed()-hasValues.ramBytesUsed());
hasValues = fixedBitSet;
}
if (hasValue) {
hasValues.set(ord);
}
}
}
void addUpdate(Term term, long value, int docUpTo) {
assert isNumeric;
final int ord = append(term);
String field = term.field;
add(field, docUpTo, ord, true);
if (numericValues[0] != value || numericValues.length != 1) {
if (numericValues.length <= ord) {
long[] array = ArrayUtil.grow(numericValues, ord+1);
if (numericValues.length == 1) {
Arrays.fill(array, 1, ord, numericValues[0]);
}
bytesUsed.addAndGet((array.length-numericValues.length) * Long.BYTES);
numericValues = array;
}
numericValues[ord] = value;
}
}
void addNoValue(Term term, int docUpTo) {
final int ord = append(term);
add(term.field, docUpTo, ord, false);
}
void addUpdate(Term term, BytesRef value, int docUpTo) {
assert isNumeric == false;
final int ord = append(term);
byteValues.append(value);
add(term.field, docUpTo, ord, true);
}
private int append(Term term) {
termValues.append(term.bytes);
return numUpdates++;
}
BufferedUpdateIterator iterator() {
return new BufferedUpdateIterator();
}
boolean isNumeric() {
assert isNumeric || byteValues != null;
return isNumeric;
}
/**
* Struct like class that is used to iterate over all updates in this buffer
*/
static class BufferedUpdate {
private BufferedUpdate() {};
/**
* the max document ID this update should be applied to
*/
int docUpTo;
/**
* a numeric value or 0 if this buffer holds binary updates
*/
long numericValue;
/**
* a binary value or null if this buffer holds numeric updates
*/
BytesRef binaryValue;
/**
* <code>true</code> if this update has a value
*/
boolean hasValue;
/**
* The update terms field. This will never be null.
*/
String termField;
/**
* The update terms value. This will never be null.
*/
BytesRef termValue;
@Override
public int hashCode() {
throw new UnsupportedOperationException(
"this struct should not be use in map or other data-stuctures that use hashCode / equals");
}
@Override
public boolean equals(Object obj) {
throw new UnsupportedOperationException(
"this struct should not be use in map or other data-stuctures that use hashCode / equals");
}
}
/**
* An iterator that iterates over all updates in insertion order
*/
class BufferedUpdateIterator {
private final BytesRefIterator termValuesIterator;
private final BytesRefIterator byteValuesIterator;
private final BufferedUpdate bufferedUpdate = new BufferedUpdate();
private final Bits updatesWithValue;
private int index = 0;
BufferedUpdateIterator() {
this.termValuesIterator = termValues.iterator();
this.byteValuesIterator = isNumeric ? null : byteValues.iterator();
updatesWithValue = hasValues == null ? new Bits.MatchAllBits(numUpdates) : hasValues;
}
/**
* Moves to the next BufferedUpdate or return null if all updates are consumed.
* The returned instance is a shared instance and must be fully consumed before the next call to this method.
*/
BufferedUpdate next() throws IOException {
BytesRef next = termValuesIterator.next();
if (next != null) {
final int idx = index++;
bufferedUpdate.termValue = next;
bufferedUpdate.hasValue = updatesWithValue.get(idx);
bufferedUpdate.termField = fields[getArrayIndex(fields.length, idx)];
bufferedUpdate.docUpTo = docsUpTo[getArrayIndex(docsUpTo.length, idx)];
if (bufferedUpdate.hasValue) {
if (isNumeric) {
bufferedUpdate.numericValue = numericValues[getArrayIndex(numericValues.length, idx)];
bufferedUpdate.binaryValue = null;
} else {
bufferedUpdate.binaryValue = byteValuesIterator.next();
}
} else {
bufferedUpdate.binaryValue = null;
bufferedUpdate.numericValue = 0;
}
return bufferedUpdate;
} else {
return null;
}
}
}
private static int getArrayIndex(int arrayLength, int index) {
assert arrayLength == 1 || arrayLength > index : "illegal array index length: " + arrayLength + " index: " + index;
return Math.min(arrayLength-1, index);
}
}

View File

@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -33,20 +32,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
@ -73,21 +67,14 @@ final class FrozenBufferedUpdates {
final Query[] deleteQueries;
final int[] deleteQueryLimits;
// numeric DV update term and their updates
final byte[] numericDVUpdates;
// binary DV update term and their updates
final byte[] binaryDVUpdates;
private final int numericDVUpdateCount;
private final int binaryDVUpdateCount;
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1);
private final ReentrantLock applyLock = new ReentrantLock();
private final Map<String, FieldUpdatesBuffer> fieldUpdates;
/** How many total documents were deleted/updated. */
public long totalDelCount;
private final int fieldUpdatesCount;
final int bytesUsed;
final int numTermDeletes;
@ -120,80 +107,25 @@ final class FrozenBufferedUpdates {
deleteQueryLimits[upto] = ent.getValue();
upto++;
}
Counter counter = Counter.newCounter();
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
numericDVUpdates = freezeDVUpdates(updates.numericUpdates, counter::addAndGet);
numericDVUpdateCount = (int)counter.get();
counter.addAndGet(-counter.get());
assert counter.get() == 0;
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
binaryDVUpdates = freezeDVUpdates(updates.binaryUpdates, counter::addAndGet);
binaryDVUpdateCount = (int)counter.get();
this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates));
this.fieldUpdatesCount = updates.numFieldUpdates.get();
bytesUsed = (int) (deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY
+ numericDVUpdates.length + binaryDVUpdates.length);
bytesUsed = (int) ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY)
+ updates.fieldUpdatesBytesUsed.get());
numTermDeletes = updates.numTermDeletes.get();
if (infoStream != null && infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
"compressed %d to %d bytes (%.2f%%) for deletes/updates; private segment %s",
updates.bytesUsed.get(), bytesUsed, 100.*bytesUsed/updates.bytesUsed.get(),
updates.ramBytesUsed(), bytesUsed, 100.*bytesUsed/updates.ramBytesUsed(),
privateSegment));
}
}
private static <T extends DocValuesUpdate> byte[] freezeDVUpdates(Map<String,LinkedHashMap<Term, T>> dvUpdates,
IntConsumer updateSizeConsumer)
throws IOException {
// TODO: we could do better here, e.g. collate the updates by field
// so if you are updating 2 fields interleaved we don't keep writing the field strings
try (RAMOutputStream out = new RAMOutputStream()) {
String lastTermField = null;
String lastUpdateField = null;
for (LinkedHashMap<Term, T> updates : dvUpdates.values()) {
updateSizeConsumer.accept(updates.size());
for (T update : updates.values()) {
int code = update.term.bytes().length << 3;
String termField = update.term.field();
if (termField.equals(lastTermField) == false) {
code |= 1;
}
String updateField = update.field;
if (updateField.equals(lastUpdateField) == false) {
code |= 2;
}
if (update.hasValue()) {
code |= 4;
}
out.writeVInt(code);
out.writeVInt(update.docIDUpto);
if (termField.equals(lastTermField) == false) {
out.writeString(termField);
lastTermField = termField;
}
if (updateField.equals(lastUpdateField) == false) {
out.writeString(updateField);
lastUpdateField = updateField;
}
out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
if (update.hasValue()) {
update.writeTo(out);
}
}
}
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
}
}
/** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
* if the private segment was already merged away. */
private List<SegmentCommitInfo> getInfosToApply(IndexWriter writer) {
@ -491,7 +423,7 @@ final class FrozenBufferedUpdates {
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (numericDVUpdates.length == 0 && binaryDVUpdates.length == 0) {
if (fieldUpdates.isEmpty()) {
return 0;
}
@ -513,33 +445,30 @@ final class FrozenBufferedUpdates {
continue;
}
final boolean isSegmentPrivateDeletes = privateSegment != null;
if (numericDVUpdates.length > 0) {
updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true, delGen, isSegmentPrivateDeletes);
if (fieldUpdates.isEmpty() == false) {
updateCount += applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes);
}
if (binaryDVUpdates.length > 0) {
updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false, delGen, isSegmentPrivateDeletes);
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d numeric updates and %d binary updates; %d new updates",
String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d field updates; %d new updates",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
numericDVUpdateCount,
binaryDVUpdateCount,
fieldUpdatesCount,
updateCount));
}
return updateCount;
}
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, byte[] updates,
boolean isNumeric, long delGen,
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
Map<String, FieldUpdatesBuffer> updates,
long delGen,
boolean segmentPrivateDeletes) throws IOException {
TermsEnum termsEnum = null;
TermsEnum termsEnum;
PostingsEnum postingsEnum = null;
// TODO: we can process the updates per DV field, from last to first so that
@ -556,39 +485,25 @@ final class FrozenBufferedUpdates {
// We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
Map<String, DocValuesFieldUpdates> holder = new HashMap<>();
ByteArrayDataInput in = new ByteArrayDataInput(updates);
String termField = null;
String updateField = null;
BytesRef term = new BytesRef();
term.bytes = new byte[16];
BytesRef scratch = new BytesRef();
scratch.bytes = new byte[16];
while (in.getPosition() != updates.length) {
int code = in.readVInt();
int docIDUpto = in.readVInt();
term.length = code >> 3;
if ((code & 1) != 0) {
termField = in.readString();
for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
String updateField = fieldUpdate.getKey();
FieldUpdatesBuffer value = fieldUpdate.getValue();
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
while ((bufferedUpdate = iterator.next()) != null) {
Terms terms = segState.reader.terms(bufferedUpdate.termField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
// no terms in this segment for this field
continue;
}
if ((code & 2) != 0) {
updateField = in.readString();
}
boolean hasValue = (code & 4) != 0;
if (term.bytes.length < term.length) {
term.bytes = ArrayUtil.grow(term.bytes, term.length);
}
in.readBytes(term.bytes, 0, term.length);
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = docIDUpto;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
@ -605,35 +520,18 @@ final class FrozenBufferedUpdates {
// TODO: we could at least *collate* by field?
// This is the field used to resolve to docIDs, e.g. an "id" field, not the doc values field we are updating!
if ((code & 1) != 0) {
Terms terms = segState.reader.terms(termField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
termsEnum = null;
}
}
final BytesRef binaryValue;
final long longValue;
if (hasValue == false) {
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else if (isNumeric) {
longValue = NumericDocValuesUpdate.readFrom(in);
binaryValue = null;
} else {
longValue = -1;
binaryValue = BinaryDocValuesUpdate.readFrom(in, scratch);
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
if (termsEnum == null) {
// no terms in this segment for this field
continue;
}
if (termsEnum.seekExact(term)) {
if (termsEnum.seekExact(bufferedUpdate.termValue)) {
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
DocValuesFieldUpdates dvUpdates = holder.get(updateField);
@ -647,7 +545,7 @@ final class FrozenBufferedUpdates {
}
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (hasValue == false) {
if (bufferedUpdate.hasValue == false) {
docIdConsumer = doc -> update.reset(doc);
} else if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
@ -681,6 +579,7 @@ final class FrozenBufferedUpdates {
}
}
}
}
// now freeze & publish:
for (DocValuesFieldUpdates update : holder.values()) {
@ -896,11 +795,8 @@ final class FrozenBufferedUpdates {
if (deleteQueries.length != 0) {
s += " numDeleteQueries=" + deleteQueries.length;
}
if (numericDVUpdates.length > 0) {
s += " numNumericDVUpdates=" + numericDVUpdateCount;
}
if (binaryDVUpdates.length > 0) {
s += " numBinaryDVUpdates=" + binaryDVUpdateCount;
if (fieldUpdates.size() > 0) {
s += " fieldUpdates=" + fieldUpdatesCount;
}
if (bytesUsed != 0) {
s += " bytesUsed=" + bytesUsed;
@ -913,6 +809,6 @@ final class FrozenBufferedUpdates {
}
boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0;
return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ;
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
public class TestFieldUpdatesBuffer extends LuceneTestCase {
public void testBascis() throws IOException {
Counter counter = Counter.newCounter();
DocValuesUpdate.NumericDocValuesUpdate update =
new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "1"), "age", 6);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, 15);
buffer.addUpdate(new Term("id", "10"), 6, 15);
buffer.addUpdate(new Term("id", "8"), 12, 15);
buffer.addUpdate(new Term("some_other_field", "8"), 13, 17);
buffer.addUpdate(new Term("id", "8"), 12, 16);
assertTrue(buffer.isNumeric());
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value = iterator.next();
assertNotNull(value);
assertEquals("id", value.termField);
assertEquals("1", value.termValue.utf8ToString());
assertEquals(6, value.numericValue);
assertEquals(15, value.docUpTo);
value = iterator.next();
assertNotNull(value);
assertEquals("id", value.termField);
assertEquals("10", value.termValue.utf8ToString());
assertEquals(6, value.numericValue);
assertEquals(15, value.docUpTo);
value = iterator.next();
assertNotNull(value);
assertEquals("id", value.termField);
assertEquals("8", value.termValue.utf8ToString());
assertEquals(12, value.numericValue);
assertEquals(15, value.docUpTo);
value = iterator.next();
assertNotNull(value);
assertEquals("some_other_field", value.termField);
assertEquals("8", value.termValue.utf8ToString());
assertEquals(13, value.numericValue);
assertEquals(17, value.docUpTo);
value = iterator.next();
assertNotNull(value);
assertEquals("id", value.termField);
assertEquals("8", value.termValue.utf8ToString());
assertEquals(12, value.numericValue);
assertEquals(16, value.docUpTo);
assertNull(iterator.next());
}
public void testUpdateShareValues() throws IOException {
Counter counter = Counter.newCounter();
int intValue = random().nextInt();
boolean valueForThree = random().nextBoolean();
DocValuesUpdate.NumericDocValuesUpdate update =
new DocValuesUpdate.NumericDocValuesUpdate(new Term("id", "0"), "enabled", intValue);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, Integer.MAX_VALUE);
buffer.addUpdate(new Term("id", "1"), intValue, Integer.MAX_VALUE);
buffer.addUpdate(new Term("id", "2"), intValue, Integer.MAX_VALUE);
if (valueForThree) {
buffer.addUpdate(new Term("id", "3"), intValue, Integer.MAX_VALUE);
} else {
buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE);
}
buffer.addUpdate(new Term("id", "4"), intValue, Integer.MAX_VALUE);
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
while ((value = iterator.next()) != null) {
boolean hasValue = count != 3 || valueForThree;
assertEquals("" + (count++), value.termValue.utf8ToString());
assertEquals("id", value.termField);
assertEquals(hasValue, value.hasValue);
if (hasValue) {
assertEquals(intValue, value.numericValue);
} else {
assertEquals(0, value.numericValue);
}
assertEquals(Integer.MAX_VALUE, value.docUpTo);
}
assertTrue(buffer.isNumeric());
}
public void testUpdateShareValuesBinary() throws IOException {
Counter counter = Counter.newCounter();
boolean valueForThree = random().nextBoolean();
DocValuesUpdate.BinaryDocValuesUpdate update =
new DocValuesUpdate.BinaryDocValuesUpdate(new Term("id", "0"), "enabled", new BytesRef(""));
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, update, Integer.MAX_VALUE);
buffer.addUpdate(new Term("id", "1"), new BytesRef(""), Integer.MAX_VALUE);
buffer.addUpdate(new Term("id", "2"), new BytesRef(""), Integer.MAX_VALUE);
if (valueForThree) {
buffer.addUpdate(new Term("id", "3"), new BytesRef(""), Integer.MAX_VALUE);
} else {
buffer.addNoValue(new Term("id", "3"), Integer.MAX_VALUE);
}
buffer.addUpdate(new Term("id", "4"), new BytesRef(""), Integer.MAX_VALUE);
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
while ((value = iterator.next()) != null) {
boolean hasValue = count != 3 || valueForThree;
assertEquals("" + (count++), value.termValue.utf8ToString());
assertEquals("id", value.termField);
assertEquals(hasValue, value.hasValue);
if (hasValue) {
assertEquals(new BytesRef(""), value.binaryValue);
} else {
assertNull(value.binaryValue);
}
assertEquals(Integer.MAX_VALUE, value.docUpTo);
}
assertFalse(buffer.isNumeric());
}
public <T extends DocValuesUpdate> T getRandomUpdate(boolean binary) {
String termField = RandomPicks.randomFrom(random(), Arrays.asList("id", "_id", "some_other_field"));
String docId = "" + random().nextInt(10);
if (binary) {
DocValuesUpdate.BinaryDocValuesUpdate value = new DocValuesUpdate.BinaryDocValuesUpdate(new Term(termField, docId), "binary",
rarely() ? null : new BytesRef(TestUtil.randomRealisticUnicodeString(random())));
return (T) (rarely() ? value.prepareForApply(random().nextInt(100)) : value);
} else {
DocValuesUpdate.NumericDocValuesUpdate value = new DocValuesUpdate.NumericDocValuesUpdate(new Term(termField, docId), "numeric",
rarely() ? null : Long.valueOf(random().nextInt(100)));
return (T) (rarely() ? value.prepareForApply(random().nextInt(100)) : value);
}
}
public void testBinaryRandom() throws IOException {
List<DocValuesUpdate.BinaryDocValuesUpdate> updates = new ArrayList<>();
int numUpdates = 1 + random().nextInt(1000);
Counter counter = Counter.newCounter();
DocValuesUpdate.BinaryDocValuesUpdate randomUpdate = getRandomUpdate(true);
updates.add(randomUpdate);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto);
for (int i = 0; i < numUpdates; i++) {
randomUpdate = getRandomUpdate(true);
updates.add(randomUpdate);
if (randomUpdate.hasValue) {
buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto);
} else {
buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto);
}
}
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
while ((value = iterator.next()) != null) {
randomUpdate = updates.get(count++);
assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString());
assertEquals(randomUpdate.term.field, value.termField);
assertEquals("count: " + count, randomUpdate.hasValue, value.hasValue);
if (randomUpdate.hasValue) {
assertEquals(randomUpdate.getValue(), value.binaryValue);
} else {
assertNull(value.binaryValue);
}
assertEquals(randomUpdate.docIDUpto, value.docUpTo);
}
assertEquals(count, updates.size());
}
public void testNumericRandom() throws IOException {
List<DocValuesUpdate.NumericDocValuesUpdate> updates = new ArrayList<>();
int numUpdates = 1 + random().nextInt(1000);
Counter counter = Counter.newCounter();
DocValuesUpdate.NumericDocValuesUpdate randomUpdate = getRandomUpdate(false);
updates.add(randomUpdate);
FieldUpdatesBuffer buffer = new FieldUpdatesBuffer(counter, randomUpdate, randomUpdate.docIDUpto);
for (int i = 0; i < numUpdates; i++) {
randomUpdate = getRandomUpdate(false);
updates.add(randomUpdate);
if (randomUpdate.hasValue) {
buffer.addUpdate(randomUpdate.term, randomUpdate.getValue(), randomUpdate.docIDUpto);
} else {
buffer.addNoValue(randomUpdate.term, randomUpdate.docIDUpto);
}
}
FieldUpdatesBuffer.BufferedUpdateIterator iterator = buffer.iterator();
FieldUpdatesBuffer.BufferedUpdate value;
int count = 0;
while ((value = iterator.next()) != null) {
randomUpdate = updates.get(count++);
assertEquals(randomUpdate.term.bytes.utf8ToString(), value.termValue.utf8ToString());
assertEquals(randomUpdate.term.field, value.termField);
assertEquals(randomUpdate.hasValue, value.hasValue);
if (randomUpdate.hasValue) {
assertEquals(randomUpdate.getValue(), value.numericValue);
} else {
assertEquals(0, value.numericValue);
}
assertEquals(randomUpdate.docIDUpto, value.docUpTo);
}
assertEquals(count, updates.size());
}
}