From 48a5b45d28a0740231128c00a04586b5a2c1cb12 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 17 Sep 2018 18:18:58 +0200 Subject: [PATCH] Ensure fully deleted segments are accounted for correctly (#33757) We can't rely on the leaf reader ordinal in a wrapped reader since it might not correspond to the ordinal in the SegmentInfos for it's SegmentCommitInfo. Relates to #32844 Closes #33689 Closes #33755 --- .../snapshots/SourceOnlySnapshot.java | 71 +++++++++++++++++-- .../SourceOnlySnapshotRepository.java | 2 +- .../snapshots/SourceOnlySnapshotIT.java | 38 +++++----- .../SourceOnlySnapshotShardTests.java | 1 - .../snapshots/SourceOnlySnapshotTests.java | 54 ++++++++++++++ 5 files changed, 141 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java index b7d6a51f45a..6c38a25f69a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java @@ -82,15 +82,25 @@ public class SourceOnlySnapshot { try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME); StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) { SegmentInfos segmentInfos = reader.getSegmentInfos(); - DirectoryReader wrapper = wrapReader(reader); List newInfos = new ArrayList<>(); - for (LeafReaderContext ctx : wrapper.leaves()) { - SegmentCommitInfo info = segmentInfos.info(ctx.ord); + for (LeafReaderContext ctx : reader.leaves()) { LeafReader leafReader = ctx.reader(); - LiveDocs liveDocs = getLiveDocs(leafReader); - if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed - SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles); - newInfos.add(newInfo); + SegmentCommitInfo info = reader.getSegmentInfos().info(ctx.ord); + assert info.info.equals(Lucene.segmentReader(ctx.reader()).getSegmentInfo().info); + /* We could do this totally different without wrapping this dummy directory reader if FilterCodecReader would have a + * getDelegate method. This is fixed in LUCENE-8502 but we need to wait for it to come in 7.5.1 or 7.6. + * The reason here is that the ctx.ord is not guaranteed to be equivalent to the SegmentCommitInfo ord in the SegmentInfo + * object since we might drop fully deleted segments. if that happens we are using the wrong reader for the SI and + * might almost certainly expose deleted documents. + */ + DirectoryReader wrappedReader = wrapReader(new DummyDirectoryReader(reader.directory(), leafReader)); + if (wrappedReader.leaves().isEmpty() == false) { + leafReader = wrappedReader.leaves().get(0).reader(); + LiveDocs liveDocs = getLiveDocs(leafReader); + if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed + SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles); + newInfos.add(newInfo); + } } } segmentInfos.clear(); @@ -258,4 +268,51 @@ public class SourceOnlySnapshot { this.bits = bits; } } + + private static class DummyDirectoryReader extends DirectoryReader { + + protected DummyDirectoryReader(Directory directory, LeafReader... segmentReaders) throws IOException { + super(directory, segmentReaders); + } + + @Override + protected DirectoryReader doOpenIfChanged() throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() throws IOException { + return false; + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + return null; + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index a75d5f488ee..704f4d90344 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -126,7 +126,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier); snapshot.syncSnapshot(snapshotIndexCommit); // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID - SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo(); tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc()); store.incRef(); try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index 6d3a17e3ebf..737e2e26970 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -97,7 +97,10 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { boolean requireRouting = randomBoolean(); boolean useNested = randomBoolean(); IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested); - assertHits(sourceIdx, builders.length); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(sourceIdx).clear().setDocs(true).get(); + long deleted = indicesStatsResponse.getTotal().docs.getDeleted(); + boolean sourceHadDeletions = deleted > 0; // we use indexRandom which might create holes ie. deleted docs + assertHits(sourceIdx, builders.length, sourceHadDeletions); assertMappings(sourceIdx, requireRouting, useNested); SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> { client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery() @@ -116,7 +119,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { client().admin().indices().prepareUpdateSettings(sourceIdx) .setSettings(Settings.builder().put("index.number_of_replicas", 1)).get(); ensureGreen(sourceIdx); - assertHits(sourceIdx, builders.length); + assertHits(sourceIdx, builders.length, sourceHadDeletions); } public void testSnapshotAndRestoreWithNested() throws Exception { @@ -125,7 +128,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true); IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get(); assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L)); - assertHits(sourceIdx, builders.length); + assertHits(sourceIdx, builders.length, true); assertMappings(sourceIdx, requireRouting, true); SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get()); @@ -141,7 +144,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1)) .get(); ensureGreen(sourceIdx); - assertHits(sourceIdx, builders.length); + assertHits(sourceIdx, builders.length, true); } private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException { @@ -165,15 +168,12 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { } } - private void assertHits(String index, int numDocsExpected) { + private void assertHits(String index, int numDocsExpected, boolean sourceHadDeletions) { SearchResponse searchResponse = client().prepareSearch(index) .addSort(SeqNoFieldMapper.NAME, SortOrder.ASC) .setSize(numDocsExpected).get(); - Consumer assertConsumer = res -> { + BiConsumer assertConsumer = (res, allowHoles) -> { SearchHits hits = res.getHits(); - IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get(); - long deleted = indicesStatsResponse.getTotal().docs.getDeleted(); - boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs long i = 0; for (SearchHit hit : hits) { String id = hit.getId(); @@ -190,18 +190,24 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase { assertEquals("r" + id, hit.field("_routing").getValue()); } }; - assertConsumer.accept(searchResponse); + assertConsumer.accept(searchResponse, sourceHadDeletions); assertEquals(numDocsExpected, searchResponse.getHits().totalHits); searchResponse = client().prepareSearch(index) .addSort(SeqNoFieldMapper.NAME, SortOrder.ASC) .setScroll("1m") .slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2)) .setSize(randomIntBetween(1, 10)).get(); - do { - // now do a scroll with a slice - assertConsumer.accept(searchResponse); - searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get(); - } while (searchResponse.getHits().getHits().length > 0); + try { + do { + // now do a scroll with a slice + assertConsumer.accept(searchResponse, true); + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get(); + } while (searchResponse.getHits().getHits().length > 0); + } finally { + if (searchResponse.getScrollId() != null) { + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 261133b8907..7058724ecf0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -162,7 +162,6 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}"; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33689") public void testRestoreMinmal() throws IOException { IndexShard shard = newStartedShard(true); int numInitialDocs = randomIntBetween(10, 100); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java index e7d731739de..f3b3aed0bf3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java @@ -12,10 +12,12 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; +import org.apache.lucene.index.CodecReader; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterMergePolicy; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; @@ -34,6 +36,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.test.ESTestCase; @@ -242,4 +245,55 @@ public class SourceOnlySnapshotTests extends ESTestCase { reader.close(); } } + + public void testFullyDeletedSegments() throws IOException { + try (Directory dir = newDirectory()) { + SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig() + .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) { + @Override + public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) { + return randomBoolean(); + } + + @Override + public boolean keepFullyDeletedSegment(IOSupplier readerIOSupplier) throws IOException { + return true; + } + })); + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new TextField("text", "the quick brown fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 1)); + doc.add(new StoredField("rank", 1)); + doc.add(new StoredField("src", "the quick brown fox")); + writer.addDocument(doc); + writer.commit(); + doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new TextField("text", "the quick brown fox", Field.Store.NO)); + doc.add(new NumericDocValuesField("rank", 3)); + doc.add(new StoredField("rank", 3)); + doc.add(new StoredField("src", "the quick brown fox")); + writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1)); + writer.commit(); + try (Directory targetDir = newDirectory()) { + IndexCommit snapshot = deletionPolicy.snapshot(); + SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir); + snapshoter.syncSnapshot(snapshot); + + try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) { + assertEquals(snapReader.maxDoc(), 1); + assertEquals(snapReader.numDocs(), 1); + assertEquals("3", snapReader.document(0).getField("rank").stringValue()); + } + try (IndexReader writerReader = DirectoryReader.open(writer)) { + assertEquals(writerReader.maxDoc(), 2); + assertEquals(writerReader.numDocs(), 1); + } + } + writer.close(); + } + } }