LUCENE-8272: Share internal DV update code between binary and numeric

Today we duplicate a fair portion of the internal logic to
apply updates of binary and numeric doc values. This change refactors
this non-trivial code to share the same code path and only differ in
if we provide a binary or numeric instance. This also allows us to
iterator over the updates only once rather than twice once for numeric
and once for binary fields.

This change also subclass DocValuesIterator from DocValuesFieldUpdates.Iterator
which allows easier consumption down the road since it now shares most of it's
interface with DocIdSetIterator which is the main interface for this in Lucene.
This commit is contained in:
Simon Willnauer 2018-04-25 15:17:34 +02:00 committed by GitHub
parent 34170272e1
commit fbeef2f726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 246 additions and 266 deletions

View File

@ -64,7 +64,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
int nextDoc() {
public int nextDoc() {
if (idx >= size) {
offset = -1;
return doc = DocIdSetIterator.NO_MORE_DOCS;
@ -86,7 +86,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
int doc() {
public int docID() {
return doc;
}
@ -205,7 +205,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
+ lengths.ramBytesUsed()
+ docs.ramBytesUsed()
+ RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ 4 * RamUsageEstimator.NUM_BYTES_INT
+ 4 * Integer.BYTES
+ 5 * RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ values.bytes().length;
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.lucene.index;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
@ -35,17 +35,23 @@ abstract class DocValuesFieldUpdates {
* updates are returned by this iterator, and the documents are returned in
* increasing order.
*/
static abstract class Iterator {
static abstract class Iterator extends DocValuesIterator {
/**
* Returns the next document which has an update, or
* {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to
* return.
*/
abstract int nextDoc();
@Override
public final boolean advanceExact(int target) {
throw new UnsupportedOperationException();
}
/** Returns the current document this iterator is on. */
abstract int doc();
@Override
public final int advance(int target) {
throw new UnsupportedOperationException();
}
@Override
public final long cost() {
throw new UnsupportedOperationException();
}
public abstract int nextDoc(); // no IOException
/**
* Returns the value of the document returned from {@link #nextDoc()}. A
@ -55,7 +61,72 @@ abstract class DocValuesFieldUpdates {
/** Returns delGen for this packet. */
abstract long delGen();
/**
* Wraps the given iterator as a BinaryDocValues instance.
*/
static BinaryDocValues asBinaryDocValues(Iterator iterator) {
return new BinaryDocValues() {
@Override
public int docID() {
return iterator.docID();
}
@Override
public BytesRef binaryValue() {
return (BytesRef) iterator.value();
}
@Override
public boolean advanceExact(int target) {
return iterator.advanceExact(target);
}
@Override
public int nextDoc() {
return iterator.nextDoc();
}
@Override
public int advance(int target) {
return iterator.advance(target);
}
@Override
public long cost() {
return iterator.cost();
}
};
}
/**
* Wraps the given iterator as a NumericDocValues instance.
*/
static NumericDocValues asNumericDocValues(Iterator iterator) {
return new NumericDocValues() {
@Override
public long longValue() {
return ((Long)iterator.value()).longValue();
}
@Override
public boolean advanceExact(int target) {
throw new UnsupportedOperationException();
}
@Override
public int docID() {
return iterator.docID();
}
@Override
public int nextDoc() {
return iterator.nextDoc();
}
@Override
public int advance(int target) {
return iterator.advance(target);
}
@Override
public long cost() {
return iterator.cost();
}
};
}
}
/** Merge-sorts multiple iterators, one per delGen, favoring the largest delGen that has updates for a given docID. */
public static Iterator mergedIterator(Iterator[] subs) {
@ -68,7 +139,7 @@ abstract class DocValuesFieldUpdates {
@Override
protected boolean lessThan(Iterator a, Iterator b) {
// sort by smaller docID
int cmp = Integer.compare(a.doc(), b.doc());
int cmp = Integer.compare(a.docID(), b.docID());
if (cmp == 0) {
// then by larger delGen
cmp = Long.compare(b.delGen(), a.delGen());
@ -106,7 +177,7 @@ abstract class DocValuesFieldUpdates {
doc = NO_MORE_DOCS;
break;
}
int newDoc = queue.top().doc();
int newDoc = queue.top().docID();
if (newDoc != doc) {
assert newDoc > doc: "doc=" + doc + " newDoc=" + newDoc;
doc = newDoc;
@ -119,14 +190,14 @@ abstract class DocValuesFieldUpdates {
}
}
} else {
doc = queue.top().doc();
doc = queue.top().docID();
first = false;
}
return doc;
}
@Override
public int doc() {
public int docID() {
return doc;
}

View File

@ -3552,13 +3552,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private void skipDeletedDoc(DocValuesFieldUpdates.Iterator[] updatesIters, int deletedDoc) {
for (DocValuesFieldUpdates.Iterator iter : updatesIters) {
if (iter.doc() == deletedDoc) {
if (iter.docID() == deletedDoc) {
iter.nextDoc();
}
// when entering the method, all iterators must already be beyond the
// deleted document, or right on it, in which case we advance them over
// and they must be beyond it now.
assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc;
assert iter.docID() > deletedDoc : "updateDoc=" + iter.docID() + " deletedDoc=" + deletedDoc;
}
}

View File

@ -56,7 +56,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
int nextDoc() {
public int nextDoc() {
if (idx >= size) {
value = null;
return doc = DocIdSetIterator.NO_MORE_DOCS;
@ -73,7 +73,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
int doc() {
public int docID() {
return doc;
}
@ -179,7 +179,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
return values.ramBytesUsed()
+ docs.ramBytesUsed()
+ RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ 2 * RamUsageEstimator.NUM_BYTES_INT
+ 2 * Integer.BYTES
+ 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}
}

View File

@ -122,28 +122,7 @@ final class PendingSoftDeletes extends PendingDeletes {
@Override
void onDocValuesUpdate(String field, DocValuesFieldUpdates.Iterator iterator) throws IOException {
if (this.field.equals(field)) {
pendingDeleteCount += applySoftDeletes(new DocIdSetIterator() {
int docID = -1;
@Override
public int docID() {
return docID;
}
@Override
public int nextDoc() {
return docID = iterator.nextDoc();
}
@Override
public int advance(int target) {
throw new UnsupportedOperationException();
}
@Override
public long cost() {
throw new UnsupportedOperationException();
}
}, getMutableBits());
pendingDeleteCount += applySoftDeletes(iterator, getMutableBits());
}
}
@Override

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@ -298,18 +299,14 @@ final class ReadersAndUpdates {
return pendingDeletes.writeLiveDocs(dir);
}
@SuppressWarnings("synthetic-access")
private synchronized void handleNumericDVUpdates(FieldInfos infos,
private synchronized void handleDVUpdates(FieldInfos infos,
Directory dir, DocValuesFormat dvFormat, final SegmentReader reader,
Map<Integer,Set<String>> fieldFiles, long maxDelGen, InfoStream infoStream) throws IOException {
for (Entry<String,List<DocValuesFieldUpdates>> ent : pendingDVUpdates.entrySet()) {
final String field = ent.getKey();
final List<DocValuesFieldUpdates> updates = ent.getValue();
if (updates.get(0).type != DocValuesType.NUMERIC) {
continue;
}
DocValuesType type = updates.get(0).type;
assert type == DocValuesType.NUMERIC || type == DocValuesType.BINARY : "unsupported type: " + type;
final List<DocValuesFieldUpdates> updatesToApply = new ArrayList<>();
long bytes = 0;
for(DocValuesFieldUpdates update : updates) {
@ -323,7 +320,6 @@ final class ReadersAndUpdates {
// nothing to apply yet
continue;
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
"now write %d pending numeric DV updates for field=%s, seg=%s, bytes=%.3f MB",
@ -332,7 +328,6 @@ final class ReadersAndUpdates {
info,
bytes/1024./1024.));
}
final long nextDocValuesGen = info.getNextDocValuesGen();
final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX);
final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), bytes));
@ -345,34 +340,118 @@ final class ReadersAndUpdates {
final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, updatesContext, segmentSuffix);
try (final DocValuesConsumer fieldsConsumer = dvFormat.fieldsConsumer(state)) {
pendingDeletes.onDocValuesUpdate(fieldInfo);
Function<FieldInfo, DocValuesFieldUpdates.Iterator> updateSupplier = (info) -> {
if (info != fieldInfo) {
throw new IllegalArgumentException("expected field info for field: " + fieldInfo.name + " but got: " + info.name);
}
DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
for(int i=0;i<subs.length;i++) {
subs[i] = updatesToApply.get(i).iterator();
}
return DocValuesFieldUpdates.mergedIterator(subs);
};
if (type == DocValuesType.BINARY) {
fieldsConsumer.addBinaryField(fieldInfo, new EmptyDocValuesProducer() {
@Override
public BinaryDocValues getBinary(FieldInfo fieldInfoIn) throws IOException {
final MergedDocValues<BinaryDocValues> mergedDocValues = new MergedDocValues<>(
reader.getBinaryDocValues(field),
DocValuesFieldUpdates.Iterator.asBinaryDocValues(updateSupplier.apply(fieldInfo)));
// Merge sort of the original doc values with updated doc values:
return new BinaryDocValues() {
@Override
public BytesRef binaryValue() throws IOException {
return mergedDocValues.currentValuesSupplier.binaryValue();
}
@Override
public boolean advanceExact(int target) {
return mergedDocValues.advanceExact(target);
}
@Override
public int docID() {
return mergedDocValues.docID();
}
@Override
public int nextDoc() throws IOException {
return mergedDocValues.nextDoc();
}
@Override
public int advance(int target) {
return mergedDocValues.advance(target);
}
@Override
public long cost() {
return mergedDocValues.cost();
}
};
}
});
} else {
// write the numeric updates to a new gen'd docvalues file
fieldsConsumer.addNumericField(fieldInfo, new EmptyDocValuesProducer() {
@Override
public NumericDocValues getNumeric(FieldInfo fieldInfoIn) throws IOException {
if (fieldInfoIn != fieldInfo) {
throw new IllegalArgumentException("wrong fieldInfo");
}
DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
for(int i=0;i<subs.length;i++) {
subs[i] = updatesToApply.get(i).iterator();
}
final DocValuesFieldUpdates.Iterator updatesIter = DocValuesFieldUpdates.mergedIterator(subs);
final NumericDocValues currentValues = reader.getNumericDocValues(field);
final MergedDocValues<NumericDocValues> mergedDocValues = new MergedDocValues<>(
reader.getNumericDocValues(field),
DocValuesFieldUpdates.Iterator.asNumericDocValues(updateSupplier.apply(fieldInfo)));
// Merge sort of the original doc values with updated doc values:
return new NumericDocValues() {
@Override
public long longValue() throws IOException {
return mergedDocValues.currentValuesSupplier.longValue();
}
@Override
public boolean advanceExact(int target) {
return mergedDocValues.advanceExact(target);
}
@Override
public int docID() {
return mergedDocValues.docID();
}
@Override
public int nextDoc() throws IOException {
return mergedDocValues.nextDoc();
}
@Override
public int advance(int target) {
return mergedDocValues.advance(target);
}
@Override
public long cost() {
return mergedDocValues.cost();
}
};
}
});
}
}
info.advanceDocValuesGen();
assert !fieldFiles.containsKey(fieldInfo.number);
fieldFiles.put(fieldInfo.number, trackingDir.getCreatedFiles());
}
}
/**
* This class merges the current on-disk DV with an incoming update DV instance and merges the two instances
* giving the incoming update precedence in terms of values, in other words the values of the update always
* wins over the on-disk version.
*/
static final class MergedDocValues<DocValuesInstance extends DocValuesIterator> extends DocValuesIterator {
// merged docID
private int docIDOut = -1;
// docID from our original doc values
private int docIDIn = -1;
private int docIDOnDisk = -1;
// docID from our updates
private int updateDocID = -1;
private long value;
private final DocValuesInstance onDiskDocValues;
private final DocValuesInstance updateDocValues;
DocValuesInstance currentValuesSupplier;
protected MergedDocValues(DocValuesInstance onDiskDocValues, DocValuesInstance updateDocValues) {
this.onDiskDocValues = onDiskDocValues;
this.updateDocValues = updateDocValues;
}
@Override
public int docID() {
@ -385,188 +464,40 @@ final class ReadersAndUpdates {
}
@Override
public boolean advanceExact(int target) throws IOException {
public boolean advanceExact(int target) {
throw new UnsupportedOperationException();
}
@Override
public long cost() {
// TODO
return 0;
}
@Override
public long longValue() {
return value;
return onDiskDocValues.cost();
}
@Override
public int nextDoc() throws IOException {
if (docIDIn == docIDOut) {
if (currentValues == null) {
docIDIn = NO_MORE_DOCS;
if (docIDOnDisk == docIDOut) {
if (onDiskDocValues == null) {
docIDOnDisk = NO_MORE_DOCS;
} else {
docIDIn = currentValues.nextDoc();
docIDOnDisk = onDiskDocValues.nextDoc();
}
}
if (updateDocID == docIDOut) {
updateDocID = updatesIter.nextDoc();
updateDocID = updateDocValues.nextDoc();
}
if (docIDIn < updateDocID) {
// no update to this doc
docIDOut = docIDIn;
value = currentValues.longValue();
if (docIDOnDisk < updateDocID) {
// no update to this doc - we use the on-disk values
docIDOut = docIDOnDisk;
currentValuesSupplier = onDiskDocValues;
} else {
docIDOut = updateDocID;
if (docIDOut != NO_MORE_DOCS) {
value = (Long) updatesIter.value();
currentValuesSupplier = updateDocValues;
}
}
return docIDOut;
}
};
}
});
}
info.advanceDocValuesGen();
assert !fieldFiles.containsKey(fieldInfo.number);
fieldFiles.put(fieldInfo.number, trackingDir.getCreatedFiles());
}
}
@SuppressWarnings("synthetic-access")
private synchronized void handleBinaryDVUpdates(FieldInfos infos,
TrackingDirectoryWrapper dir, DocValuesFormat dvFormat, final SegmentReader reader,
Map<Integer,Set<String>> fieldFiles, long maxDelGen, InfoStream infoStream) throws IOException {
for (Entry<String,List<DocValuesFieldUpdates>> ent : pendingDVUpdates.entrySet()) {
final String field = ent.getKey();
final List<DocValuesFieldUpdates> updates = ent.getValue();
if (updates.get(0).type != DocValuesType.BINARY) {
continue;
}
final List<DocValuesFieldUpdates> updatesToApply = new ArrayList<>();
long bytes = 0;
for(DocValuesFieldUpdates update : updates) {
if (update.delGen <= maxDelGen) {
// safe to apply this one
bytes += update.ramBytesUsed();
updatesToApply.add(update);
}
}
if (updatesToApply.isEmpty()) {
// nothing to apply yet
continue;
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
"now write %d pending binary DV updates for field=%s, seg=%s, bytes=%.3fMB",
updatesToApply.size(),
field,
info,
bytes/1024./1024.));
}
final long nextDocValuesGen = info.getNextDocValuesGen();
final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX);
final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), bytes));
final FieldInfo fieldInfo = infos.fieldInfo(field);
assert fieldInfo != null;
fieldInfo.setDocValuesGen(nextDocValuesGen);
final FieldInfos fieldInfos = new FieldInfos(new FieldInfo[] { fieldInfo });
// separately also track which files were created for this gen
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, updatesContext, segmentSuffix);
try (final DocValuesConsumer fieldsConsumer = dvFormat.fieldsConsumer(state)) {
// write the binary updates to a new gen'd docvalues file
pendingDeletes.onDocValuesUpdate(fieldInfo);
fieldsConsumer.addBinaryField(fieldInfo, new EmptyDocValuesProducer() {
@Override
public BinaryDocValues getBinary(FieldInfo fieldInfoIn) throws IOException {
if (fieldInfoIn != fieldInfo) {
throw new IllegalArgumentException("wrong fieldInfo");
}
DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
for(int i=0;i<subs.length;i++) {
subs[i] = updatesToApply.get(i).iterator();
}
final DocValuesFieldUpdates.Iterator updatesIter = DocValuesFieldUpdates.mergedIterator(subs);
final BinaryDocValues currentValues = reader.getBinaryDocValues(field);
// Merge sort of the original doc values with updated doc values:
return new BinaryDocValues() {
// merged docID
private int docIDOut = -1;
// docID from our original doc values
private int docIDIn = -1;
// docID from our updates
private int updateDocID = -1;
private BytesRef value;
@Override
public int docID() {
return docIDOut;
}
@Override
public int advance(int target) {
throw new UnsupportedOperationException();
}
@Override
public boolean advanceExact(int target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long cost() {
return currentValues.cost();
}
@Override
public BytesRef binaryValue() {
return value;
}
@Override
public int nextDoc() throws IOException {
if (docIDIn == docIDOut) {
if (currentValues == null) {
docIDIn = NO_MORE_DOCS;
} else {
docIDIn = currentValues.nextDoc();
}
}
if (updateDocID == docIDOut) {
updateDocID = updatesIter.nextDoc();
}
if (docIDIn < updateDocID) {
// no update to this doc
docIDOut = docIDIn;
value = currentValues.binaryValue();
} else {
docIDOut = updateDocID;
if (docIDOut != NO_MORE_DOCS) {
value = (BytesRef) updatesIter.value();
}
}
return docIDOut;
}
};
}
});
}
info.advanceDocValuesGen();
assert !fieldFiles.containsKey(fieldInfo.number);
fieldFiles.put(fieldInfo.number, trackingDir.getCreatedFiles());
}
}
private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir,
FieldInfosFormat infosFormat) throws IOException {
@ -649,8 +580,7 @@ final class ReadersAndUpdates {
fieldInfos = builder.finish();
final DocValuesFormat docValuesFormat = codec.docValuesFormat();
handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
handleDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, codec.fieldInfosFormat());
} finally {

View File

@ -211,12 +211,12 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
int doc = -1;
@Override
int nextDoc() {
public int nextDoc() {
return doc = iter.next();
}
@Override
int doc() {
public int docID() {
return doc;
}