Do not expose hard-deleted docs in Lucene history (#32333)
Today when reading operation history in Lucene, we read all documents. However, if indexing a document is aborted, IndexWriter will hard-delete it; we, therefore, need to exclude that document from Lucene history. This commit makes sure that we exclude aborted documents by using the hard liveDocs of a SegmentReader if there are deletes. Closes #32269
This commit is contained in:
parent
2245812ef7
commit
1fdc3f08be
|
@ -339,8 +339,8 @@ setup:
|
|||
---
|
||||
"Indexing and Querying without contexts is forbidden":
|
||||
- skip:
|
||||
version: "all"
|
||||
reason: AwaitsFix https://github.com/elastic/elasticsearch/issues/32269
|
||||
version: " - 6.99.99"
|
||||
reason: this feature was removed in 7.0
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -837,7 +837,8 @@ public class Lucene {
|
|||
}
|
||||
|
||||
/**
|
||||
* Wraps a directory reader to include all live docs.
|
||||
* Wraps a directory reader to make all documents live except those were rolled back
|
||||
* or hard-deleted due to non-aborting exceptions during indexing.
|
||||
* The wrapped reader can be used to query all documents.
|
||||
*
|
||||
* @param in the input directory reader
|
||||
|
@ -848,17 +849,21 @@ public class Lucene {
|
|||
}
|
||||
|
||||
private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
|
||||
static final class SubReaderWithAllLiveDocs extends FilterLeafReader {
|
||||
SubReaderWithAllLiveDocs(LeafReader in) {
|
||||
static final class LeafReaderWithLiveDocs extends FilterLeafReader {
|
||||
final Bits liveDocs;
|
||||
final int numDocs;
|
||||
LeafReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) {
|
||||
super(in);
|
||||
this.liveDocs = liveDocs;
|
||||
this.numDocs = numDocs;
|
||||
}
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
return null;
|
||||
return liveDocs;
|
||||
}
|
||||
@Override
|
||||
public int numDocs() {
|
||||
return maxDoc();
|
||||
return numDocs;
|
||||
}
|
||||
@Override
|
||||
public CacheHelper getCoreCacheHelper() {
|
||||
|
@ -869,14 +874,28 @@ public class Lucene {
|
|||
return null; // Modifying liveDocs
|
||||
}
|
||||
}
|
||||
|
||||
DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
|
||||
super(in, new FilterDirectoryReader.SubReaderWrapper() {
|
||||
super(in, new SubReaderWrapper() {
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader leaf) {
|
||||
return new SubReaderWithAllLiveDocs(leaf);
|
||||
SegmentReader segmentReader = segmentReader(leaf);
|
||||
Bits hardLiveDocs = segmentReader.getHardLiveDocs();
|
||||
if (hardLiveDocs == null) {
|
||||
return new LeafReaderWithLiveDocs(leaf, null, leaf.maxDoc());
|
||||
}
|
||||
// TODO: Avoid recalculate numDocs everytime.
|
||||
int numDocs = 0;
|
||||
for (int i = 0; i < hardLiveDocs.length(); i++) {
|
||||
if (hardLiveDocs.get(i)) {
|
||||
numDocs++;
|
||||
}
|
||||
}
|
||||
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||
return wrapAllDocsLive(in);
|
||||
|
|
|
@ -33,18 +33,23 @@ import org.apache.lucene.index.NoDeletionPolicy;
|
|||
import org.apache.lucene.index.NoMergePolicy;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
@ -53,6 +58,8 @@ import java.util.Set;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class LuceneTests extends ESTestCase {
|
||||
public void testWaitForIndex() throws Exception {
|
||||
final MockDirectoryWrapper dir = newMockDirectory();
|
||||
|
@ -406,4 +413,88 @@ public class LuceneTests extends ESTestCase {
|
|||
// add assume's here if needed for certain platforms, but we should know if it does not work.
|
||||
assertTrue("MMapDirectory does not support unmapping: " + MMapDirectory.UNMAP_NOT_SUPPORTED_REASON, MMapDirectory.UNMAP_SUPPORTED);
|
||||
}
|
||||
|
||||
public void testWrapAllDocsLive() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
|
||||
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
|
||||
IndexWriter writer = new IndexWriter(dir, config);
|
||||
int numDocs = between(1, 10);
|
||||
Set<String> liveDocs = new HashSet<>();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
String id = Integer.toString(i);
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", id, Store.YES));
|
||||
writer.addDocument(doc);
|
||||
liveDocs.add(id);
|
||||
}
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
if (randomBoolean()) {
|
||||
String id = Integer.toString(i);
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "v2-" + id, Store.YES));
|
||||
if (randomBoolean()) {
|
||||
doc.add(Lucene.newSoftDeleteField());
|
||||
}
|
||||
writer.softUpdateDocument(new Term("id", id), doc, Lucene.newSoftDeleteField());
|
||||
liveDocs.add("v2-" + id);
|
||||
}
|
||||
}
|
||||
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
|
||||
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
|
||||
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
Set<String> actualDocs = new HashSet<>();
|
||||
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
|
||||
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
|
||||
}
|
||||
assertThat(actualDocs, equalTo(liveDocs));
|
||||
}
|
||||
IOUtils.close(writer, dir);
|
||||
}
|
||||
|
||||
public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
|
||||
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
|
||||
IndexWriter writer = new IndexWriter(dir, config);
|
||||
int numDocs = between(1, 10);
|
||||
List<String> liveDocs = new ArrayList<>();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
String id = Integer.toString(i);
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", id, Store.YES));
|
||||
if (randomBoolean()) {
|
||||
doc.add(Lucene.newSoftDeleteField());
|
||||
}
|
||||
writer.addDocument(doc);
|
||||
liveDocs.add(id);
|
||||
}
|
||||
int abortedDocs = between(1, 10);
|
||||
for (int i = 0; i < abortedDocs; i++) {
|
||||
try {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "aborted-" + i, Store.YES));
|
||||
StringReader reader = new StringReader("");
|
||||
doc.add(new TextField("other", reader));
|
||||
reader.close(); // mark the indexing hit non-aborting error
|
||||
writer.addDocument(doc);
|
||||
fail("index should have failed");
|
||||
} catch (Exception ignored) { }
|
||||
}
|
||||
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
|
||||
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
|
||||
assertThat(reader.maxDoc(), equalTo(numDocs + abortedDocs));
|
||||
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
List<String> actualDocs = new ArrayList<>();
|
||||
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
|
||||
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
|
||||
}
|
||||
assertThat(actualDocs, equalTo(liveDocs));
|
||||
}
|
||||
IOUtils.close(writer, dir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -3228,30 +3227,4 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
closeShards(shard);
|
||||
}
|
||||
|
||||
public void testSearcherIncludesSoftDeletes() throws Exception {
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.build();
|
||||
IndexMetaData metaData = IndexMetaData.builder("test")
|
||||
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
|
||||
.settings(settings)
|
||||
.primaryTerm(0, 1).build();
|
||||
IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
|
||||
recoverShardFromStore(shard);
|
||||
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
|
||||
indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}");
|
||||
deleteDoc(shard, "test", "0");
|
||||
shard.refresh("test");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
|
||||
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L));
|
||||
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L));
|
||||
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
|
||||
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
|
||||
}
|
||||
closeShards(shard);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue