Ensure fully deleted segments are accounted for correctly (#33757)
We can't rely on the leaf reader ordinal in a wrapped reader since it might not correspond to the ordinal in the SegmentInfos for it's SegmentCommitInfo. Relates to #32844 Closes #33689 Closes #33755
This commit is contained in:
parent
7046cc467f
commit
48a5b45d28
|
@ -82,15 +82,25 @@ public class SourceOnlySnapshot {
|
||||||
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
||||||
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
|
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
|
||||||
SegmentInfos segmentInfos = reader.getSegmentInfos();
|
SegmentInfos segmentInfos = reader.getSegmentInfos();
|
||||||
DirectoryReader wrapper = wrapReader(reader);
|
|
||||||
List<SegmentCommitInfo> newInfos = new ArrayList<>();
|
List<SegmentCommitInfo> newInfos = new ArrayList<>();
|
||||||
for (LeafReaderContext ctx : wrapper.leaves()) {
|
for (LeafReaderContext ctx : reader.leaves()) {
|
||||||
SegmentCommitInfo info = segmentInfos.info(ctx.ord);
|
|
||||||
LeafReader leafReader = ctx.reader();
|
LeafReader leafReader = ctx.reader();
|
||||||
LiveDocs liveDocs = getLiveDocs(leafReader);
|
SegmentCommitInfo info = reader.getSegmentInfos().info(ctx.ord);
|
||||||
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
|
assert info.info.equals(Lucene.segmentReader(ctx.reader()).getSegmentInfo().info);
|
||||||
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
|
/* We could do this totally different without wrapping this dummy directory reader if FilterCodecReader would have a
|
||||||
newInfos.add(newInfo);
|
* getDelegate method. This is fixed in LUCENE-8502 but we need to wait for it to come in 7.5.1 or 7.6.
|
||||||
|
* The reason here is that the ctx.ord is not guaranteed to be equivalent to the SegmentCommitInfo ord in the SegmentInfo
|
||||||
|
* object since we might drop fully deleted segments. if that happens we are using the wrong reader for the SI and
|
||||||
|
* might almost certainly expose deleted documents.
|
||||||
|
*/
|
||||||
|
DirectoryReader wrappedReader = wrapReader(new DummyDirectoryReader(reader.directory(), leafReader));
|
||||||
|
if (wrappedReader.leaves().isEmpty() == false) {
|
||||||
|
leafReader = wrappedReader.leaves().get(0).reader();
|
||||||
|
LiveDocs liveDocs = getLiveDocs(leafReader);
|
||||||
|
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
|
||||||
|
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
|
||||||
|
newInfos.add(newInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
segmentInfos.clear();
|
segmentInfos.clear();
|
||||||
|
@ -258,4 +268,51 @@ public class SourceOnlySnapshot {
|
||||||
this.bits = bits;
|
this.bits = bits;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class DummyDirectoryReader extends DirectoryReader {
|
||||||
|
|
||||||
|
protected DummyDirectoryReader(Directory directory, LeafReader... segmentReaders) throws IOException {
|
||||||
|
super(directory, segmentReaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doOpenIfChanged() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getVersion() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCurrent() throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexCommit getIndexCommit() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheHelper getReaderCacheHelper() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
|
||||||
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
|
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
|
||||||
snapshot.syncSnapshot(snapshotIndexCommit);
|
snapshot.syncSnapshot(snapshotIndexCommit);
|
||||||
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
|
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
|
||||||
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
|
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
|
||||||
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
|
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
|
||||||
store.incRef();
|
store.incRef();
|
||||||
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
|
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
|
||||||
|
|
|
@ -49,7 +49,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
@ -97,7 +97,10 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
boolean requireRouting = randomBoolean();
|
boolean requireRouting = randomBoolean();
|
||||||
boolean useNested = randomBoolean();
|
boolean useNested = randomBoolean();
|
||||||
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested);
|
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested);
|
||||||
assertHits(sourceIdx, builders.length);
|
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(sourceIdx).clear().setDocs(true).get();
|
||||||
|
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
|
||||||
|
boolean sourceHadDeletions = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
|
||||||
|
assertHits(sourceIdx, builders.length, sourceHadDeletions);
|
||||||
assertMappings(sourceIdx, requireRouting, useNested);
|
assertMappings(sourceIdx, requireRouting, useNested);
|
||||||
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
|
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
|
||||||
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery()
|
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery()
|
||||||
|
@ -116,7 +119,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
client().admin().indices().prepareUpdateSettings(sourceIdx)
|
client().admin().indices().prepareUpdateSettings(sourceIdx)
|
||||||
.setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
|
.setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
|
||||||
ensureGreen(sourceIdx);
|
ensureGreen(sourceIdx);
|
||||||
assertHits(sourceIdx, builders.length);
|
assertHits(sourceIdx, builders.length, sourceHadDeletions);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotAndRestoreWithNested() throws Exception {
|
public void testSnapshotAndRestoreWithNested() throws Exception {
|
||||||
|
@ -125,7 +128,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true);
|
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true);
|
||||||
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
|
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
|
||||||
assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L));
|
assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L));
|
||||||
assertHits(sourceIdx, builders.length);
|
assertHits(sourceIdx, builders.length, true);
|
||||||
assertMappings(sourceIdx, requireRouting, true);
|
assertMappings(sourceIdx, requireRouting, true);
|
||||||
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
|
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
|
||||||
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get());
|
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get());
|
||||||
|
@ -141,7 +144,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1))
|
client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1))
|
||||||
.get();
|
.get();
|
||||||
ensureGreen(sourceIdx);
|
ensureGreen(sourceIdx);
|
||||||
assertHits(sourceIdx, builders.length);
|
assertHits(sourceIdx, builders.length, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
|
private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
|
||||||
|
@ -165,15 +168,12 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertHits(String index, int numDocsExpected) {
|
private void assertHits(String index, int numDocsExpected, boolean sourceHadDeletions) {
|
||||||
SearchResponse searchResponse = client().prepareSearch(index)
|
SearchResponse searchResponse = client().prepareSearch(index)
|
||||||
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
|
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
|
||||||
.setSize(numDocsExpected).get();
|
.setSize(numDocsExpected).get();
|
||||||
Consumer<SearchResponse> assertConsumer = res -> {
|
BiConsumer<SearchResponse, Boolean> assertConsumer = (res, allowHoles) -> {
|
||||||
SearchHits hits = res.getHits();
|
SearchHits hits = res.getHits();
|
||||||
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
|
|
||||||
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
|
|
||||||
boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
|
|
||||||
long i = 0;
|
long i = 0;
|
||||||
for (SearchHit hit : hits) {
|
for (SearchHit hit : hits) {
|
||||||
String id = hit.getId();
|
String id = hit.getId();
|
||||||
|
@ -190,18 +190,24 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
assertEquals("r" + id, hit.field("_routing").getValue());
|
assertEquals("r" + id, hit.field("_routing").getValue());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
assertConsumer.accept(searchResponse);
|
assertConsumer.accept(searchResponse, sourceHadDeletions);
|
||||||
assertEquals(numDocsExpected, searchResponse.getHits().totalHits);
|
assertEquals(numDocsExpected, searchResponse.getHits().totalHits);
|
||||||
searchResponse = client().prepareSearch(index)
|
searchResponse = client().prepareSearch(index)
|
||||||
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
|
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
|
||||||
.setScroll("1m")
|
.setScroll("1m")
|
||||||
.slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2))
|
.slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2))
|
||||||
.setSize(randomIntBetween(1, 10)).get();
|
.setSize(randomIntBetween(1, 10)).get();
|
||||||
do {
|
try {
|
||||||
// now do a scroll with a slice
|
do {
|
||||||
assertConsumer.accept(searchResponse);
|
// now do a scroll with a slice
|
||||||
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
|
assertConsumer.accept(searchResponse, true);
|
||||||
} while (searchResponse.getHits().getHits().length > 0);
|
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
|
||||||
|
} while (searchResponse.getHits().getHits().length > 0);
|
||||||
|
} finally {
|
||||||
|
if (searchResponse.getScrollId() != null) {
|
||||||
|
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,6 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||||
return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}";
|
return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}";
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33689")
|
|
||||||
public void testRestoreMinmal() throws IOException {
|
public void testRestoreMinmal() throws IOException {
|
||||||
IndexShard shard = newStartedShard(true);
|
IndexShard shard = newStartedShard(true);
|
||||||
int numInitialDocs = randomIntBetween(10, 100);
|
int numInitialDocs = randomIntBetween(10, 100);
|
||||||
|
|
|
@ -12,10 +12,12 @@ import org.apache.lucene.document.NumericDocValuesField;
|
||||||
import org.apache.lucene.document.StoredField;
|
import org.apache.lucene.document.StoredField;
|
||||||
import org.apache.lucene.document.StringField;
|
import org.apache.lucene.document.StringField;
|
||||||
import org.apache.lucene.document.TextField;
|
import org.apache.lucene.document.TextField;
|
||||||
|
import org.apache.lucene.index.CodecReader;
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.FilterMergePolicy;
|
import org.apache.lucene.index.FilterMergePolicy;
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.apache.lucene.index.IndexFileNames;
|
import org.apache.lucene.index.IndexFileNames;
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.IndexWriterConfig;
|
import org.apache.lucene.index.IndexWriterConfig;
|
||||||
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
||||||
|
@ -34,6 +36,7 @@ import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.TermQuery;
|
import org.apache.lucene.search.TermQuery;
|
||||||
import org.apache.lucene.search.TopDocs;
|
import org.apache.lucene.search.TopDocs;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.util.IOSupplier;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
@ -242,4 +245,55 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFullyDeletedSegments() throws IOException {
|
||||||
|
try (Directory dir = newDirectory()) {
|
||||||
|
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||||
|
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()
|
||||||
|
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
|
||||||
|
.setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) {
|
||||||
|
@Override
|
||||||
|
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) {
|
||||||
|
return randomBoolean();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
Document doc = new Document();
|
||||||
|
doc.add(new StringField("id", "1", Field.Store.YES));
|
||||||
|
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
|
||||||
|
doc.add(new NumericDocValuesField("rank", 1));
|
||||||
|
doc.add(new StoredField("rank", 1));
|
||||||
|
doc.add(new StoredField("src", "the quick brown fox"));
|
||||||
|
writer.addDocument(doc);
|
||||||
|
writer.commit();
|
||||||
|
doc = new Document();
|
||||||
|
doc.add(new StringField("id", "1", Field.Store.YES));
|
||||||
|
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
|
||||||
|
doc.add(new NumericDocValuesField("rank", 3));
|
||||||
|
doc.add(new StoredField("rank", 3));
|
||||||
|
doc.add(new StoredField("src", "the quick brown fox"));
|
||||||
|
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
|
||||||
|
writer.commit();
|
||||||
|
try (Directory targetDir = newDirectory()) {
|
||||||
|
IndexCommit snapshot = deletionPolicy.snapshot();
|
||||||
|
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
|
||||||
|
snapshoter.syncSnapshot(snapshot);
|
||||||
|
|
||||||
|
try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
|
||||||
|
assertEquals(snapReader.maxDoc(), 1);
|
||||||
|
assertEquals(snapReader.numDocs(), 1);
|
||||||
|
assertEquals("3", snapReader.document(0).getField("rank").stringValue());
|
||||||
|
}
|
||||||
|
try (IndexReader writerReader = DirectoryReader.open(writer)) {
|
||||||
|
assertEquals(writerReader.maxDoc(), 2);
|
||||||
|
assertEquals(writerReader.numDocs(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue