LUCENE-8338: Ensure number returned for PendingDeletes are well defined

Today a call to PendingDeletes#numPendingDeletes might return 0
if the deletes are written to disk. This doesn't mean these values are committed
or refreshed in the latest reader. Some places in IW use these numbers to make
decisions if there has been deletes added since last time checked (BufferedUpdateStream)
which can cause wrong (while not fatal) decision ie. to kick of new merges.

Now this API is made protected and not visible outside of PendingDeletes to prevent
any kind of confusion. The APIs now allow to get absolute numbers of getDelCount and numDocs
which have the same name and semantics as their relatives on IndexReader/Writer
and SegmentCommitInfo.
This commit is contained in:
Simon Willnauer 2018-05-29 16:08:12 +02:00
parent d243f35a54
commit 76263087b5
13 changed files with 83 additions and 67 deletions

View File

@ -259,7 +259,7 @@ final class BufferedUpdatesStream implements Accountable {
SegmentState(ReadersAndUpdates rld, IOUtils.IOConsumer<ReadersAndUpdates> onClose, SegmentCommitInfo info) throws IOException { SegmentState(ReadersAndUpdates rld, IOUtils.IOConsumer<ReadersAndUpdates> onClose, SegmentCommitInfo info) throws IOException {
this.rld = rld; this.rld = rld;
startDelCount = rld.getPendingDeleteCount(); startDelCount = rld.getDelCount();
delGen = info.getBufferedDeletesGen(); delGen = info.getBufferedDeletesGen();
this.onClose = onClose; this.onClose = onClose;
reader = rld.getReader(IOContext.READ); reader = rld.getReader(IOContext.READ);

View File

@ -99,8 +99,8 @@ public class FilterMergePolicy extends MergePolicy {
} }
@Override @Override
public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, public int numDeletesToMerge(SegmentCommitInfo info, int delCount,
IOSupplier<CodecReader> readerSupplier) throws IOException { IOSupplier<CodecReader> readerSupplier) throws IOException {
return in.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); return in.numDeletesToMerge(info, delCount, readerSupplier);
} }
} }

View File

@ -390,8 +390,8 @@ final class FrozenBufferedUpdates {
final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates); final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
for (BufferedUpdatesStream.SegmentState segState : segmentStates) { for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
if (success) { if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount; totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount(); int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc(); assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) { if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) { if (allDeleted == null) {

View File

@ -635,14 +635,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
public int numDeletedDocs(SegmentCommitInfo info) { public int numDeletedDocs(SegmentCommitInfo info) {
ensureOpen(false); ensureOpen(false);
validate(info); validate(info);
int delCount = info.getDelCount();
final ReadersAndUpdates rld = getPooledInstance(info, false); final ReadersAndUpdates rld = getPooledInstance(info, false);
if (rld != null) { if (rld != null) {
delCount += rld.getPendingDeleteCount(); return rld.getDelCount(); // get the full count from here since SCI might change concurrently
} else {
int delCount = info.getDelCount();
assert delCount <= info.info.maxDoc(): "delCount: " + delCount + " maxDoc: " + info.info.maxDoc();
return delCount;
} }
assert delCount <= info.info.maxDoc(): "delCount: " + delCount + " maxDoc: " + info.info.maxDoc();
return delCount;
} }
/** /**
@ -3695,7 +3695,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Lazy init (only when we find a delete or update to carry over): // Lazy init (only when we find a delete or update to carry over):
final ReadersAndUpdates mergedDeletesAndUpdates = getPooledInstance(merge.info, true); final ReadersAndUpdates mergedDeletesAndUpdates = getPooledInstance(merge.info, true);
int numDeletesBefore = mergedDeletesAndUpdates.getDelCount();
// field -> delGen -> dv field updates // field -> delGen -> dv field updates
Map<String,Map<Long,DocValuesFieldUpdates>> mappedDVUpdates = new HashMap<>(); Map<String,Map<Long,DocValuesFieldUpdates>> mappedDVUpdates = new HashMap<>();
@ -3786,7 +3786,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (mergedDeletesAndUpdates == null) { if (mergedDeletesAndUpdates == null) {
infoStream.message("IW", "no new deletes or field updates since merge started"); infoStream.message("IW", "no new deletes or field updates since merge started");
} else { } else {
String msg = mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes"; String msg = mergedDeletesAndUpdates.getDelCount() - numDeletesBefore + " new deletes";
if (anyDVUpdates) { if (anyDVUpdates) {
msg += " and " + mergedDeletesAndUpdates.getNumDVUpdates() + " new field updates"; msg += " and " + mergedDeletesAndUpdates.getNumDVUpdates() + " new field updates";
msg += " (" + mergedDeletesAndUpdates.ramBytesUsed.get() + ") bytes"; msg += " (" + mergedDeletesAndUpdates.ramBytesUsed.get() + ") bytes";
@ -4361,7 +4361,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context); ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
SegmentReader reader = mr.reader; SegmentReader reader = mr.reader;
int delCount = reader.numDeletedDocs();
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader); infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
@ -4369,7 +4368,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
merge.hardLiveDocs.add(mr.hardLiveDocs); merge.hardLiveDocs.add(mr.hardLiveDocs);
merge.readers.add(reader); merge.readers.add(reader);
assert delCount <= info.info.maxDoc(): "delCount=" + delCount + " info.maxDoc=" + info.info.maxDoc() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
segUpto++; segUpto++;
} }

View File

@ -638,12 +638,12 @@ public abstract class MergePolicy {
* @see IndexWriter#softUpdateDocument(Term, Iterable, Field...) * @see IndexWriter#softUpdateDocument(Term, Iterable, Field...)
* @see IndexWriterConfig#setSoftDeletesField(String) * @see IndexWriterConfig#setSoftDeletesField(String)
* @param info the segment info that identifies the segment * @param info the segment info that identifies the segment
* @param pendingDeleteCount the number of pending deletes for this segment * @param delCount the number deleted documents for this segment
* @param readerSupplier a supplier that allows to obtain a {@link CodecReader} for this segment * @param readerSupplier a supplier that allows to obtain a {@link CodecReader} for this segment
*/ */
public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, public int numDeletesToMerge(SegmentCommitInfo info, int delCount,
IOSupplier<CodecReader> readerSupplier) throws IOException { IOSupplier<CodecReader> readerSupplier) throws IOException {
return info.getDelCount() + pendingDeleteCount; return delCount;
} }
/** /**

View File

@ -81,8 +81,8 @@ public final class NoMergePolicy extends MergePolicy {
} }
@Override @Override
public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, IOSupplier<CodecReader> readerSupplier) throws IOException { public int numDeletesToMerge(SegmentCommitInfo info, int delCount, IOSupplier<CodecReader> readerSupplier) throws IOException {
return super.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); return super.numDeletesToMerge(info, delCount, readerSupplier);
} }
@Override @Override

View File

@ -118,7 +118,7 @@ class PendingDeletes {
/** /**
* Returns the number of pending deletes that are not written to disk. * Returns the number of pending deletes that are not written to disk.
*/ */
int numPendingDeletes() { protected int numPendingDeletes() {
return pendingDeleteCount; return pendingDeleteCount;
} }
@ -232,7 +232,49 @@ class PendingDeletes {
} }
int numDeletesToMerge(MergePolicy policy, IOSupplier<CodecReader> readerIOSupplier) throws IOException { int numDeletesToMerge(MergePolicy policy, IOSupplier<CodecReader> readerIOSupplier) throws IOException {
return policy.numDeletesToMerge(info, numPendingDeletes(), readerIOSupplier); return policy.numDeletesToMerge(info, getDelCount(), readerIOSupplier);
} }
/**
* Returns true if the given reader needs to be refreshed in order to see the latest deletes
*/
final boolean needsRefresh(CodecReader reader) {
return reader.getLiveDocs() != getLiveDocs() || reader.numDeletedDocs() != getDelCount();
}
/**
* Returns the number of deleted docs in the segment.
*/
final int getDelCount() {
return info.getDelCount() + numPendingDeletes();
}
/**
* Returns the number of live documents in this segment
*/
final int numDocs() {
return info.info.maxDoc() - getDelCount();
}
// Call only from assert!
boolean verifyDocCounts(CodecReader reader) {
int count = 0;
Bits liveDocs = getLiveDocs();
if (liveDocs != null) {
for(int docID = 0; docID < info.info.maxDoc(); docID++) {
if (liveDocs.get(docID)) {
count++;
}
}
} else {
count = info.info.maxDoc();
}
assert numDocs() == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() +
" pendingDeletes=" + toString() + " count=" + count;
assert reader.numDocs() == numDocs() : "reader.numDocs() = " + reader.numDocs() + " numDocs() " + numDocs();
assert reader.numDeletedDocs() <= info.info.maxDoc(): "delCount=" + reader.numDeletedDocs() + " info.maxDoc=" +
info.info.maxDoc() + " rld.pendingDeleteCount=" + numPendingDeletes() +
" info.getDelCount()=" + info.getDelCount();
return true;
}
} }

View File

@ -58,6 +58,7 @@ final class PendingSoftDeletes extends PendingDeletes {
} else { } else {
// if it was deleted subtract the delCount // if it was deleted subtract the delCount
pendingDeleteCount--; pendingDeleteCount--;
assert pendingDeleteCount >= 0 : " illegal pending delete count: " + pendingDeleteCount;
} }
return true; return true;
} }
@ -65,7 +66,7 @@ final class PendingSoftDeletes extends PendingDeletes {
} }
@Override @Override
int numPendingDeletes() { protected int numPendingDeletes() {
return super.numPendingDeletes() + hardDeletes.numPendingDeletes(); return super.numPendingDeletes() + hardDeletes.numPendingDeletes();
} }
@ -78,11 +79,11 @@ final class PendingSoftDeletes extends PendingDeletes {
if (iterator != null) { // nothing is deleted we don't have a soft deletes field in this segment if (iterator != null) { // nothing is deleted we don't have a soft deletes field in this segment
assert info.info.maxDoc() > 0 : "maxDoc is 0"; assert info.info.maxDoc() > 0 : "maxDoc is 0";
pendingDeleteCount += applySoftDeletes(iterator, getMutableBits()); pendingDeleteCount += applySoftDeletes(iterator, getMutableBits());
assert pendingDeleteCount >= 0 : " illegal pending delete count: " + pendingDeleteCount;
} }
dvGeneration = info.getDocValuesGen(); dvGeneration = info.getDocValuesGen();
} }
assert numPendingDeletes() + info.getDelCount() <= info.info.maxDoc() : assert getDelCount() <= info.info.maxDoc() : getDelCount() + " > " + info.info.maxDoc();
numPendingDeletes() + " + " + info.getDelCount() + " > " + info.info.maxDoc();
} }
@Override @Override
@ -133,6 +134,7 @@ final class PendingSoftDeletes extends PendingDeletes {
void onDocValuesUpdate(FieldInfo info, DocValuesFieldUpdates.Iterator iterator) throws IOException { void onDocValuesUpdate(FieldInfo info, DocValuesFieldUpdates.Iterator iterator) throws IOException {
if (this.field.equals(info.name)) { if (this.field.equals(info.name)) {
pendingDeleteCount += applySoftDeletes(iterator, getMutableBits()); pendingDeleteCount += applySoftDeletes(iterator, getMutableBits());
assert pendingDeleteCount >= 0 : " illegal pending delete count: " + pendingDeleteCount;
assert dvGeneration < info.getDocValuesGen() : "we have seen this generation update already: " + dvGeneration + " vs. " + info.getDocValuesGen(); assert dvGeneration < info.getDocValuesGen() : "we have seen this generation update already: " + dvGeneration + " vs. " + info.getDocValuesGen();
assert dvGeneration != -2 : "docValues generation is still uninitialized"; assert dvGeneration != -2 : "docValues generation is still uninitialized";
dvGeneration = info.getDocValuesGen(); dvGeneration = info.getDocValuesGen();
@ -208,5 +210,4 @@ final class PendingSoftDeletes extends PendingDeletes {
Bits getHardLiveDocs() { Bits getHardLiveDocs() {
return hardDeletes.getLiveDocs(); return hardDeletes.getLiveDocs();
} }
} }

View File

@ -132,7 +132,7 @@ final class ReaderPool implements Closeable {
*/ */
synchronized boolean anyPendingDeletes() { synchronized boolean anyPendingDeletes() {
for(ReadersAndUpdates rld : readerMap.values()) { for(ReadersAndUpdates rld : readerMap.values()) {
if (rld.getPendingDeleteCount() != 0) { if (rld.anyPendingDeletes()) {
return true; return true;
} }
} }
@ -321,7 +321,6 @@ final class ReaderPool implements Closeable {
/** /**
* Returns <code>true</code> iff there are any buffered doc values updates. Otherwise <code>false</code>. * Returns <code>true</code> iff there are any buffered doc values updates. Otherwise <code>false</code>.
* @see #anyPendingDeletes()
*/ */
synchronized boolean anyDocValuesChanges() { synchronized boolean anyDocValuesChanges() {
for (ReadersAndUpdates rld : readerMap.values()) { for (ReadersAndUpdates rld : readerMap.values()) {

View File

@ -100,8 +100,6 @@ final class ReadersAndUpdates {
* <p>NOTE: steals incoming ref from reader. */ * <p>NOTE: steals incoming ref from reader. */
ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) throws IOException { ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) throws IOException {
this(indexCreatedVersionMajor, reader.getOriginalSegmentInfo(), pendingDeletes); this(indexCreatedVersionMajor, reader.getOriginalSegmentInfo(), pendingDeletes);
assert pendingDeletes.numPendingDeletes() >= 0
: "got " + pendingDeletes.numPendingDeletes() + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
this.reader = reader; this.reader = reader;
pendingDeletes.onNewReader(reader, info); pendingDeletes.onNewReader(reader, info);
} }
@ -122,10 +120,9 @@ final class ReadersAndUpdates {
return rc; return rc;
} }
public synchronized int getPendingDeleteCount() { public synchronized int getDelCount() {
return pendingDeletes.numPendingDeletes(); return pendingDeletes.getDelCount();
} }
private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) { private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) {
for (int i=0;i<fieldUpdates.size();i++) { for (int i=0;i<fieldUpdates.size();i++) {
DocValuesFieldUpdates oldUpdate = fieldUpdates.get(i); DocValuesFieldUpdates oldUpdate = fieldUpdates.get(i);
@ -167,24 +164,6 @@ final class ReadersAndUpdates {
return count; return count;
} }
// Call only from assert!
public synchronized boolean verifyDocCounts() {
int count;
Bits liveDocs = pendingDeletes.getLiveDocs();
if (liveDocs != null) {
count = 0;
for(int docID=0;docID<info.info.maxDoc();docID++) {
if (liveDocs.get(docID)) {
count++;
}
}
} else {
count = info.info.maxDoc();
}
assert info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes() == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() + " pendingDeletes=" + pendingDeletes.numPendingDeletes() + " count=" + count;
return true;
}
/** Returns a {@link SegmentReader}. */ /** Returns a {@link SegmentReader}. */
public synchronized SegmentReader getReader(IOContext context) throws IOException { public synchronized SegmentReader getReader(IOContext context) throws IOException {
@ -235,8 +214,7 @@ final class ReadersAndUpdates {
// force new liveDocs // force new liveDocs
Bits liveDocs = pendingDeletes.getLiveDocs(); Bits liveDocs = pendingDeletes.getLiveDocs();
if (liveDocs != null) { if (liveDocs != null) {
return new SegmentReader(info, reader, liveDocs, return new SegmentReader(info, reader, liveDocs, pendingDeletes.numDocs());
info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
} else { } else {
// liveDocs == null and reader != null. That can only be if there are no deletes // liveDocs == null and reader != null. That can only be if there are no deletes
assert reader.getLiveDocs() == null; assert reader.getLiveDocs() == null;
@ -254,8 +232,7 @@ final class ReadersAndUpdates {
// get a reader and dec the ref right away we just make sure we have a reader // get a reader and dec the ref right away we just make sure we have a reader
getReader(IOContext.READ).decRef(); getReader(IOContext.READ).decRef();
} }
if (reader.getLiveDocs() != pendingDeletes.getLiveDocs() if (pendingDeletes.needsRefresh(reader)) {
|| reader.numDeletedDocs() != info.getDelCount() - pendingDeletes.numPendingDeletes()) {
// we have a reader but its live-docs are out of sync. let's create a temporary one that we never share // we have a reader but its live-docs are out of sync. let's create a temporary one that we never share
swapNewReaderWithLatestLiveDocs(); swapNewReaderWithLatestLiveDocs();
} }
@ -429,6 +406,10 @@ final class ReadersAndUpdates {
} }
} }
synchronized boolean anyPendingDeletes() {
return pendingDeletes.numPendingDeletes() != 0;
}
/** /**
* This class merges the current on-disk DV with an incoming update DV instance and merges the two instances * This class merges the current on-disk DV with an incoming update DV instance and merges the two instances
* giving the incoming update precedence in terms of values, in other words the values of the update always * giving the incoming update precedence in terms of values, in other words the values of the update always
@ -668,8 +649,7 @@ final class ReadersAndUpdates {
private SegmentReader createNewReaderWithLatestLiveDocs(SegmentReader reader) throws IOException { private SegmentReader createNewReaderWithLatestLiveDocs(SegmentReader reader) throws IOException {
assert reader != null; assert reader != null;
assert Thread.holdsLock(this) : Thread.currentThread().getName(); assert Thread.holdsLock(this) : Thread.currentThread().getName();
SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), pendingDeletes.numDocs());
info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
boolean success2 = false; boolean success2 = false;
try { try {
pendingDeletes.onNewReader(newReader, info); pendingDeletes.onNewReader(newReader, info);
@ -727,14 +707,13 @@ final class ReadersAndUpdates {
} }
SegmentReader reader = getReader(context); SegmentReader reader = getReader(context);
int delCount = pendingDeletes.numPendingDeletes() + info.getDelCount(); if (pendingDeletes.needsRefresh(reader)) {
if (delCount != reader.numDeletedDocs()) {
// beware of zombies: // beware of zombies:
assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs();
assert pendingDeletes.getLiveDocs() != null; assert pendingDeletes.getLiveDocs() != null;
reader = createNewReaderWithLatestLiveDocs(reader); reader = createNewReaderWithLatestLiveDocs(reader);
} }
assert verifyDocCounts(); assert pendingDeletes.verifyDocCounts(reader);
return new MergeReader(reader, pendingDeletes.getHardLiveDocs()); return new MergeReader(reader, pendingDeletes.getHardLiveDocs());
} }

View File

@ -173,8 +173,8 @@ public final class SoftDeletesRetentionMergePolicy extends OneMergeWrappingMerge
} }
@Override @Override
public int numDeletesToMerge(SegmentCommitInfo info, int pendingDeleteCount, IOSupplier<CodecReader> readerSupplier) throws IOException { public int numDeletesToMerge(SegmentCommitInfo info, int delCount, IOSupplier<CodecReader> readerSupplier) throws IOException {
final int numDeletesToMerge = super.numDeletesToMerge(info, pendingDeleteCount, readerSupplier); final int numDeletesToMerge = super.numDeletesToMerge(info, delCount, readerSupplier);
if (numDeletesToMerge != 0) { if (numDeletesToMerge != 0) {
final CodecReader reader = readerSupplier.get(); final CodecReader reader = readerSupplier.get();
if (reader.getLiveDocs() != null) { if (reader.getLiveDocs() != null) {

View File

@ -178,12 +178,9 @@ public class TestReaderPool extends LuceneTestCase {
boolean expectUpdate = false; boolean expectUpdate = false;
int doc = -1; int doc = -1;
if (postings != null && postings.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (postings != null && postings.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
readersAndUpdates.delete(doc = postings.docID()); assertTrue(readersAndUpdates.delete(doc = postings.docID()));
expectUpdate = true; expectUpdate = true;
assertEquals(DocIdSetIterator.NO_MORE_DOCS, postings.nextDoc()); assertEquals(DocIdSetIterator.NO_MORE_DOCS, postings.nextDoc());
assertTrue(pool.anyPendingDeletes());
} else {
assertFalse(pool.anyPendingDeletes());
} }
assertFalse(pool.anyDocValuesChanges()); // deletes are not accounted here assertFalse(pool.anyDocValuesChanges()); // deletes are not accounted here
readOnlyClone.close(); readOnlyClone.close();

View File

@ -104,7 +104,7 @@ public class TestTryDelete extends LuceneTestCase
} }
assertTrue(writer.hasDeletions()); assertTrue(writer.hasDeletions());
mgr.maybeRefresh(); mgr.maybeRefresh();
searcher = mgr.acquire(); searcher = mgr.acquire();