mirror of https://github.com/apache/lucene.git
LUCENE-8290: Keep soft deletes in sync with on-disk DocValues
Today we pass on the doc values update to the PendingDeletes when it's applied. This might cause issues with a rentention policy merge policy that will see a deleted document but not it's value on disk. This change moves back the PendingDeletes callback to flush time in order to be consistent with what is actually updated on disk. This change also makes sure we write values to disk on flush that are in the reader pool as well as extra best effort checks to drop fully deleted segments on flush, commit and getReader.
This commit is contained in:
parent
8b9c2a3185
commit
591fc6627a
|
@ -510,16 +510,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
|
||||
// TODO: we could instead just clone SIS and pull/incref readers in sync'd block, and then do this w/o IW's lock?
|
||||
// Must do this sync'd on IW to prevent a merge from completing at the last second and failing to write its DV updates:
|
||||
if (readerPool.writeAllDocValuesUpdates()) {
|
||||
checkpoint();
|
||||
}
|
||||
|
||||
if (writeAllDeletes) {
|
||||
// Must move the deletes to disk:
|
||||
if (readerPool.commit(segmentInfos)) {
|
||||
checkpointNoSIS();
|
||||
}
|
||||
}
|
||||
writeReaderPool(writeAllDeletes);
|
||||
|
||||
// Prevent segmentInfos from changing while opening the
|
||||
// reader; in theory we could instead do similar retry logic,
|
||||
|
@ -3132,11 +3123,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
|
||||
applyAllDeletesAndUpdates();
|
||||
synchronized(this) {
|
||||
|
||||
if (readerPool.commit(segmentInfos)) {
|
||||
checkpointNoSIS();
|
||||
}
|
||||
|
||||
writeReaderPool(true);
|
||||
if (changeCount.get() != lastCommitChangeCount) {
|
||||
// There are changes to commit, so we will write a new segments_N in startCommit.
|
||||
// The act of committing is itself an NRT-visible change (an NRT reader that was
|
||||
|
@ -3218,6 +3205,39 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that all changes in the reader-pool are written to disk.
|
||||
* @param writeDeletes if <code>true</code> if deletes should be written to disk too.
|
||||
*/
|
||||
private final void writeReaderPool(boolean writeDeletes) throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
if (writeDeletes) {
|
||||
if (readerPool.commit(segmentInfos)) {
|
||||
checkpointNoSIS();
|
||||
}
|
||||
} else { // only write the docValues
|
||||
if (readerPool.writeAllDocValuesUpdates()) {
|
||||
checkpoint();
|
||||
}
|
||||
}
|
||||
// now do some best effort to check if a segment is fully deleted
|
||||
List<SegmentCommitInfo> toDrop = new ArrayList<>(); // don't modify segmentInfos in-place
|
||||
for (SegmentCommitInfo info : segmentInfos) {
|
||||
ReadersAndUpdates readersAndUpdates = readerPool.get(info, false);
|
||||
if (readersAndUpdates != null) {
|
||||
if (isFullyDeleted(readersAndUpdates)) {
|
||||
toDrop.add(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (SegmentCommitInfo info : toDrop) {
|
||||
dropDeletedSegment(info);
|
||||
}
|
||||
if (toDrop.isEmpty() == false) {
|
||||
checkpoint();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the iterator to provide the commit user data map at commit time. Calling this method
|
||||
* is considered a committable change and will be {@link #commit() committed} even if
|
||||
|
@ -3503,6 +3523,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
anyChanges |= maybeMerge.getAndSet(false);
|
||||
|
||||
synchronized(this) {
|
||||
writeReaderPool(applyAllDeletes);
|
||||
doAfterFlush();
|
||||
success = true;
|
||||
return anyChanges;
|
||||
|
|
|
@ -235,18 +235,11 @@ class PendingDeletes {
|
|||
}
|
||||
|
||||
/**
|
||||
* Called before the given DocValuesFieldUpdates are written to disk
|
||||
* @param info the field to apply
|
||||
*/
|
||||
void onDocValuesUpdate(FieldInfo info) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called for every field update for the given field
|
||||
* @param field the field that's updated
|
||||
* Called for every field update for the given field at flush time
|
||||
* @param info the field info of the field that's updated
|
||||
* @param iterator the values to apply
|
||||
*/
|
||||
void onDocValuesUpdate(String field, DocValuesFieldUpdates.Iterator iterator) throws IOException {
|
||||
void onDocValuesUpdate(FieldInfo info, DocValuesFieldUpdates.Iterator iterator) throws IOException {
|
||||
}
|
||||
|
||||
int numDeletesToMerge(MergePolicy policy, IOSupplier<CodecReader> readerIOSupplier) throws IOException {
|
||||
|
|
|
@ -120,14 +120,9 @@ final class PendingSoftDeletes extends PendingDeletes {
|
|||
}
|
||||
|
||||
@Override
|
||||
void onDocValuesUpdate(String field, DocValuesFieldUpdates.Iterator iterator) throws IOException {
|
||||
if (this.field.equals(field)) {
|
||||
void onDocValuesUpdate(FieldInfo info, DocValuesFieldUpdates.Iterator iterator) throws IOException {
|
||||
if (this.field.equals(info.name)) {
|
||||
pendingDeleteCount += applySoftDeletes(iterator, getMutableBits());
|
||||
}
|
||||
}
|
||||
@Override
|
||||
void onDocValuesUpdate(FieldInfo info) {
|
||||
if (field.equals(info.name)) {
|
||||
assert dvGeneration < info.getDocValuesGen() : "we have seen this generation update already: " + dvGeneration + " vs. " + info.getDocValuesGen();
|
||||
assert dvGeneration != -2 : "docValues generation is still uninitialized";
|
||||
dvGeneration = info.getDocValuesGen();
|
||||
|
|
|
@ -161,7 +161,6 @@ final class ReadersAndUpdates {
|
|||
}
|
||||
fieldUpdates.add(update);
|
||||
}
|
||||
pendingDeletes.onDocValuesUpdate(update.field, update.iterator());
|
||||
}
|
||||
|
||||
public synchronized long getNumDVUpdates() {
|
||||
|
@ -334,7 +333,6 @@ final class ReadersAndUpdates {
|
|||
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
|
||||
final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, updatesContext, segmentSuffix);
|
||||
try (final DocValuesConsumer fieldsConsumer = dvFormat.fieldsConsumer(state)) {
|
||||
pendingDeletes.onDocValuesUpdate(fieldInfo);
|
||||
Function<FieldInfo, DocValuesFieldUpdates.Iterator> updateSupplier = (info) -> {
|
||||
if (info != fieldInfo) {
|
||||
throw new IllegalArgumentException("expected field info for field: " + fieldInfo.name + " but got: " + info.name);
|
||||
|
@ -345,6 +343,7 @@ final class ReadersAndUpdates {
|
|||
}
|
||||
return DocValuesFieldUpdates.mergedIterator(subs);
|
||||
};
|
||||
pendingDeletes.onDocValuesUpdate(fieldInfo, updateSupplier.apply(fieldInfo));
|
||||
if (type == DocValuesType.BINARY) {
|
||||
fieldsConsumer.addBinaryField(fieldInfo, new EmptyDocValuesProducer() {
|
||||
@Override
|
||||
|
|
|
@ -116,9 +116,8 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
|
|||
List<Integer> docsDeleted = Arrays.asList(1, 3, 7, 8, DocIdSetIterator.NO_MORE_DOCS);
|
||||
List<DocValuesFieldUpdates> updates = Arrays.asList(singleUpdate(docsDeleted, 10));
|
||||
for (DocValuesFieldUpdates update : updates) {
|
||||
deletes.onDocValuesUpdate(update.field, update.iterator());
|
||||
deletes.onDocValuesUpdate(fieldInfo, update.iterator());
|
||||
}
|
||||
deletes.onDocValuesUpdate(fieldInfo);
|
||||
assertEquals(4, deletes.numPendingDeletes());
|
||||
assertTrue(deletes.getLiveDocs().get(0));
|
||||
assertFalse(deletes.getLiveDocs().get(1));
|
||||
|
@ -135,9 +134,8 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
|
|||
updates = Arrays.asList(singleUpdate(docsDeleted, 10));
|
||||
fieldInfo = new FieldInfo("_soft_deletes", 1, false, false, false, IndexOptions.NONE, DocValuesType.NUMERIC, 1, Collections.emptyMap(), 0, 0);
|
||||
for (DocValuesFieldUpdates update : updates) {
|
||||
deletes.onDocValuesUpdate(update.field, update.iterator());
|
||||
deletes.onDocValuesUpdate(fieldInfo, update.iterator());
|
||||
}
|
||||
deletes.onDocValuesUpdate(fieldInfo);
|
||||
assertEquals(5, deletes.numPendingDeletes());
|
||||
assertTrue(deletes.getLiveDocs().get(0));
|
||||
assertFalse(deletes.getLiveDocs().get(1));
|
||||
|
@ -180,9 +178,8 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
|
|||
List<Integer> docsDeleted = Arrays.asList(1, DocIdSetIterator.NO_MORE_DOCS);
|
||||
List<DocValuesFieldUpdates> updates = Arrays.asList(singleUpdate(docsDeleted, 3));
|
||||
for (DocValuesFieldUpdates update : updates) {
|
||||
deletes.onDocValuesUpdate(update.field, update.iterator());
|
||||
deletes.onDocValuesUpdate(fieldInfo, update.iterator());
|
||||
}
|
||||
deletes.onDocValuesUpdate(fieldInfo);
|
||||
assertEquals(1, deletes.numPendingDeletes());
|
||||
assertTrue(deletes.getLiveDocs().get(0));
|
||||
assertFalse(deletes.getLiveDocs().get(1));
|
||||
|
|
|
@ -289,6 +289,9 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||
writer.softUpdateDocument(new Term("id", id), doc,
|
||||
new NumericDocValuesField("soft_delete", 1));
|
||||
}
|
||||
if (rarely()) {
|
||||
writer.flush();
|
||||
}
|
||||
ids.add(id);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
|
@ -382,13 +385,13 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||
Document tombstone = new Document();
|
||||
tombstone.add(new NumericDocValuesField("soft_delete", 1));
|
||||
writer.softUpdateDocument(new Term("id", "1"), tombstone, new NumericDocValuesField("soft_delete", 1));
|
||||
// Internally, forceMergeDeletes will call flush to flush pending updates
|
||||
writer.forceMergeDeletes(true); // Internally, forceMergeDeletes will call flush to flush pending updates
|
||||
// Thus, we will have two segments - both having soft-deleted documents.
|
||||
// We expect any MP to merge these segments into one segment
|
||||
// when calling forceMergeDeletes.
|
||||
writer.forceMergeDeletes(true);
|
||||
assertEquals(1, writer.maxDoc());
|
||||
assertEquals(1, writer.segmentInfos.asList().size());
|
||||
assertEquals(1, writer.numDocs());
|
||||
assertEquals(1, writer.maxDoc());
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue