LUCENE-8608: Extract utility class to iterate over terms docs

Today we re-implement the same algorithm in various places
when we want to consume all docs for a set/list of terms. This
caused serious slowdowns for instance in the case of applying
updates fixed in LUCENE-8602. This change extracts the common
usage and shares the interation code including logic to reuse
Terms and PostingsEnum instances as much as possble and adds
tests for it.
This commit is contained in:
Simon Willnauer 2018-12-07 22:17:26 +01:00
parent d7ad2f46c3
commit 4272c631ca
6 changed files with 250 additions and 130 deletions

View File

@ -69,11 +69,11 @@ class BufferedUpdates implements Accountable {
final AtomicInteger numTermDeletes = new AtomicInteger();
final AtomicInteger numFieldUpdates = new AtomicInteger();
final Map<Term,Integer> deleteTerms = new HashMap<>();
final Map<Term,Integer> deleteTerms = new HashMap<>(); // TODO cut this over to FieldUpdatesBuffer
final Map<Query,Integer> deleteQueries = new HashMap<>();
final List<Integer> deleteDocIDs = new ArrayList<>();
final Map<String,FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
final Map<String, FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
@ -109,7 +109,7 @@ class BufferedUpdates implements Accountable {
s += " " + deleteDocIDs.size() + " deleted docIDs";
}
if (numFieldUpdates.get() != 0) {
s += " " + numFieldUpdates.get() + " field updates (unique count=" + fieldUpdates.size() + ")";
s += " " + numFieldUpdates.get() + " field updates";
}
if (bytesUsed.get() != 0) {
s += " bytesUsed=" + bytesUsed.get();

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.lucene.codecs.FieldsConsumer;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
@ -42,37 +43,21 @@ final class FreqProxTermsWriter extends TermsHash {
Map<Term,Integer> segDeletes = state.segUpdates.deleteTerms;
List<Term> deleteTerms = new ArrayList<>(segDeletes.keySet());
Collections.sort(deleteTerms);
String lastField = null;
TermsEnum termsEnum = null;
PostingsEnum postingsEnum = null;
FrozenBufferedUpdates.TermDocsIterator iterator = new FrozenBufferedUpdates.TermDocsIterator(fields, true);
for(Term deleteTerm : deleteTerms) {
if (deleteTerm.field().equals(lastField) == false) {
lastField = deleteTerm.field();
Terms terms = fields.terms(lastField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
termsEnum = null;
}
}
if (termsEnum != null && termsEnum.seekExact(deleteTerm.bytes())) {
postingsEnum = termsEnum.postings(postingsEnum, 0);
DocIdSetIterator postings = iterator.nextTerm(deleteTerm.field(), deleteTerm.bytes());
if (postings != null ) {
int delDocLimit = segDeletes.get(deleteTerm);
assert delDocLimit < PostingsEnum.NO_MORE_DOCS;
while (true) {
int doc = postingsEnum.nextDoc();
if (doc < delDocLimit) {
if (state.liveDocs == null) {
state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc());
state.liveDocs.set(0, state.segmentInfo.maxDoc());
}
if (state.liveDocs.get(doc)) {
state.delCountOnFlush++;
state.liveDocs.clear(doc);
}
} else {
break;
int doc;
while ((doc = postings.nextDoc()) < delDocLimit) {
if (state.liveDocs == null) {
state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc());
state.liveDocs.set(0, state.segmentInfo.maxDoc());
}
if (state.liveDocs.get(doc)) {
state.delCountOnFlush++;
state.liveDocs.clear(doc);
}
}
}

View File

@ -86,7 +86,7 @@ final class FrozenBufferedUpdates {
// only have Queries and doc values updates
private final InfoStream infoStream;
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) throws IOException {
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
this.infoStream = infoStream;
this.privateSegment = privateSegment;
assert updates.deleteDocIDs.isEmpty();
@ -468,9 +468,6 @@ final class FrozenBufferedUpdates {
long delGen,
boolean segmentPrivateDeletes) throws IOException {
TermsEnum termsEnum = null;
PostingsEnum postingsEnum = null;
// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
@ -492,26 +489,8 @@ final class FrozenBufferedUpdates {
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
String previousField = null;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
while ((bufferedUpdate = iterator.next()) != null) {
if (previousField == null || previousField.equals(bufferedUpdate.termField) == false) {
previousField = bufferedUpdate.termField;
Terms terms = segState.reader.terms(previousField);
termsEnum = terms == null ? null : terms.iterator();
}
if (termsEnum == null) {
// no terms in this segment for this field
continue;
}
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
// same document, the last one that came in wins, irrespective of the
@ -521,23 +500,26 @@ final class FrozenBufferedUpdates {
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.
// TODO: we could at least *collate* by field?
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
if (termsEnum.seekExact(bufferedUpdate.termValue)) {
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
termDocsIterator.getDocs();
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
@ -566,7 +548,7 @@ final class FrozenBufferedUpdates {
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
@ -577,7 +559,7 @@ final class FrozenBufferedUpdates {
}
} else {
int doc;
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
@ -706,57 +688,13 @@ final class FrozenBufferedUpdates {
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
String field = null;
TermsEnum termsEnum = null;
BytesRef readerTerm = null;
PostingsEnum postingsEnum = null;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
if (iter.field() != field) {
// field changed
field = iter.field();
Terms terms = segState.reader.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
readerTerm = termsEnum.next();
} else {
termsEnum = null;
}
}
if (termsEnum != null) {
int cmp = delTerm.compareTo(readerTerm);
if (cmp < 0) {
// TODO: can we advance across del terms here?
// move to next del term
continue;
} else if (cmp == 0) {
// fall through
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(delTerm);
if (status == TermsEnum.SeekStatus.FOUND) {
// fall through
} else if (status == TermsEnum.SeekStatus.NOT_FOUND) {
readerTerm = termsEnum.term();
continue;
} else {
// TODO: can we advance to next field in deleted terms?
// no more terms in this segment
termsEnum = null;
continue;
}
}
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
assert postingsEnum != null;
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
@ -821,4 +759,104 @@ final class FrozenBufferedUpdates {
boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ;
}
/**
* This class helps iterating a term dictionary and consuming all the docs for each terms.
* It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry
* for the given value. It has an optimized way of iterating the term dictionary if the terms are
* passed in sorted order and makes sure terms and postings are reused as much as possible.
*/
static final class TermDocsIterator {
private final TermsProvider provider;
private String field;
private TermsEnum termsEnum;
private PostingsEnum postingsEnum;
private final boolean sortedTerms;
private BytesRef readerTerm;
private BytesRef lastTerm; // only set with asserts
@FunctionalInterface
interface TermsProvider {
Terms terms(String field) throws IOException;
}
TermDocsIterator(Fields fields, boolean sortedTerms) {
this(fields::terms, sortedTerms);
}
TermDocsIterator(LeafReader reader, boolean sortedTerms) {
this(reader::terms, sortedTerms);
}
private TermDocsIterator(TermsProvider provider, boolean sortedTerms) {
this.sortedTerms = sortedTerms;
this.provider = provider;
}
private void setField(String field) throws IOException {
if (this.field == null || this.field.equals(field) == false) {
this.field = field;
Terms terms = provider.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
if (sortedTerms) {
assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field
readerTerm = termsEnum.next();
}
} else {
termsEnum = null;
}
}
}
DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException {
setField(field);
if (termsEnum != null) {
if (sortedTerms) {
assert assertSorted(term);
// in the sorted case we can take advantage of the "seeking forward" property
// this allows us depending on the term dict impl to reuse data-structures internally
// which speed up iteration over terms and docs significantly.
int cmp = term.compareTo(readerTerm);
if (cmp < 0) {
return null; // requested term does not exist in this segment
} else if (cmp == 0) {
return getDocs();
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(term);
switch (status) {
case FOUND:
return getDocs();
case NOT_FOUND:
readerTerm = termsEnum.term();
return null;
case END:
// no more terms in this segment
termsEnum = null;
return null;
default:
throw new AssertionError("unknown status");
}
}
} else if (termsEnum.seekExact(term)) {
return getDocs();
}
}
return null;
}
private boolean assertSorted(BytesRef term) {
assert sortedTerms;
assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString();
lastTerm = BytesRef.deepCopyOf(term);
return true;
}
private DocIdSetIterator getDocs() throws IOException {
assert termsEnum != null;
return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
}
}
}

View File

@ -35,7 +35,7 @@ public final class BytesRefArray implements SortableBytesRefArray {
private int lastElement = 0;
private int currentOffset = 0;
private final Counter bytesUsed;
/**
* Creates a new {@link BytesRefArray} with a counter to track allocated bytes
*/
@ -190,7 +190,6 @@ public final class BytesRefArray implements SortableBytesRefArray {
final int[] indices = comp == null ? null : sort(comp);
return new BytesRefIterator() {
int pos = 0;
@Override
public BytesRef next() {
if (pos < size) {

View File

@ -35,14 +35,8 @@ public interface BytesRefIterator {
* the end of the iterator is reached.
* @throws IOException If there is a low-level I/O error.
*/
public BytesRef next() throws IOException;
BytesRef next() throws IOException;
/** Singleton BytesRefIterator that iterates over 0 BytesRefs. */
public static final BytesRefIterator EMPTY = new BytesRefIterator() {
@Override
public BytesRef next() {
return null;
}
};
BytesRefIterator EMPTY = () -> null;
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefArray;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.index.FrozenBufferedUpdates.TermDocsIterator;
import org.apache.lucene.util.TestUtil;
public class TestFrozenBufferedUpdates extends LuceneTestCase {
public void testTermDocsIterator() throws IOException {
for (int j = 0; j < 5; j++) {
try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) {
boolean duplicates = random().nextBoolean();
boolean nonMatches = random().nextBoolean();
BytesRefArray array = new BytesRefArray(Counter.newCounter());
int numDocs = 10 + random().nextInt(1000);
Set<BytesRef> randomIds = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
BytesRef id;
do {
id = new BytesRef(TestUtil.randomRealisticUnicodeString(random()));
} while (randomIds.add(id) == false);
}
List<BytesRef> asList = new ArrayList<>(randomIds);
for (BytesRef ref : randomIds) {
Document doc = new Document();
doc.add(new StringField("field", ref, Field.Store.NO));
array.append(ref);
if (duplicates && rarely()) {
array.append(RandomPicks.randomFrom(random(), asList));
}
if (nonMatches && rarely()) {
BytesRef id;
do {
id = new BytesRef(TestUtil.randomRealisticUnicodeString(random()));
} while (randomIds.contains(id));
array.append(id);
}
writer.addDocument(doc);
}
writer.forceMerge(1);
writer.commit();
try (DirectoryReader reader = DirectoryReader.open(dir)) {
boolean sorted = random().nextBoolean();
BytesRefIterator values = sorted ? array.iterator(Comparator.naturalOrder()) : array.iterator();
assertEquals(1, reader.leaves().size());
TermDocsIterator iterator = new TermDocsIterator(reader.leaves().get(0).reader(), sorted);
FixedBitSet bitSet = new FixedBitSet(reader.maxDoc());
BytesRef ref;
while ((ref = values.next()) != null) {
DocIdSetIterator docIdSetIterator = iterator.nextTerm("field", ref);
if (nonMatches == false) {
assertNotNull(docIdSetIterator);
}
if (docIdSetIterator != null) {
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (duplicates == false) {
assertFalse(bitSet.get(doc));
}
bitSet.set(doc);
}
}
}
assertEquals(reader.maxDoc(), bitSet.cardinality());
}
}
}
}
}