Index stale operations to Lucene to have complete history (#29679)

Today, when processing out of order operations, we only add it into
translog but skip adding into Lucene. Translog, therefore, has a
complete history of sequence numbers while Lucene does not.

Since we would like to have a complete history in Lucene, this change
makes sure that stale operations will be added to Lucene as soft-deleted
documents if required.

Relates #29530
This commit is contained in:
Nhat Nguyen 2018-04-27 19:39:29 -04:00 committed by GitHub
parent 112b5f1744
commit 8ebca76cf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 294 additions and 70 deletions

View File

@ -30,6 +30,7 @@ import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
@ -833,6 +834,58 @@ public class Lucene {
};
}
/**
* Wraps a directory reader to include all live docs.
* The wrapped reader can be used to query all documents.
*
* @param in the input directory reader
* @return the wrapped reader
*/
public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException {
return new DirectoryReaderWithAllLiveDocs(in);
}
private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
static final class SubReaderWithAllLiveDocs extends FilterLeafReader {
SubReaderWithAllLiveDocs(LeafReader in) {
super(in);
}
@Override
public Bits getLiveDocs() {
return null;
}
@Override
public int numDocs() {
return maxDoc();
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}
DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
super(in, new FilterDirectoryReader.SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader leaf) {
return new SubReaderWithAllLiveDocs(leaf);
}
});
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapAllDocsLive(in);
}
@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}
/**
* Returns a numeric docvalues which can be used to soft-delete documents.
*/

View File

@ -21,12 +21,14 @@ package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
@ -43,6 +45,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
@ -769,10 +772,9 @@ public class InternalEngine extends Engine {
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.hasFailure();
} else if (plan.indexIntoLucene) {
} else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
// TODO: We need to index stale documents to have a full history in Lucene.
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
@ -838,7 +840,6 @@ public class InternalEngine extends Engine {
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
@ -847,16 +848,15 @@ public class InternalEngine extends Engine {
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
}
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()
);
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.seqNo(), index.version());
}
}
}
return plan;
@ -914,7 +914,7 @@ public class InternalEngine extends Engine {
throws IOException {
assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
assert plan.indexIntoLucene;
assert plan.indexIntoLucene || plan.addStaleOpToLucene;
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
@ -922,7 +922,9 @@ public class InternalEngine extends Engine {
index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
if (plan.useLuceneUpdateDocument) {
if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
@ -986,16 +988,27 @@ public class InternalEngine extends Engine {
numDocAppends.inc(docs.size());
}
private void addStaleDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
assert softDeleteEnabled : "Add history documents but soft-deletes is disabled";
docs.forEach(d -> d.add(softDeleteField));
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
} else {
indexWriter.addDocument(docs.get(0));
}
}
protected static final class IndexingStrategy {
final boolean currentNotFoundOrDeleted;
final boolean useLuceneUpdateDocument;
final long seqNoForIndexing;
final long versionForIndexing;
final boolean indexIntoLucene;
final boolean addStaleOpToLucene;
final Optional<IndexResult> earlyResultOnPreFlightError;
private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument,
boolean indexIntoLucene, long seqNoForIndexing,
boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing,
long versionForIndexing, IndexResult earlyResultOnPreFlightError) {
assert useLuceneUpdateDocument == false || indexIntoLucene :
"use lucene update is set to true, but we're not indexing into lucene";
@ -1008,37 +1021,40 @@ public class InternalEngine extends Engine {
this.seqNoForIndexing = seqNoForIndexing;
this.versionForIndexing = versionForIndexing;
this.indexIntoLucene = indexIntoLucene;
this.addStaleOpToLucene = addStaleOpToLucene;
this.earlyResultOnPreFlightError =
earlyResultOnPreFlightError == null ? Optional.empty() :
Optional.of(earlyResultOnPreFlightError);
}
static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null);
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
}
static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
}
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false,
true, seqNoForIndexing, versionForIndexing, null);
true, false, seqNoForIndexing, versionForIndexing, null);
}
static IndexingStrategy overrideExistingAsIfNotThere(
long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null);
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
}
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false,
false, seqNoForIndexing, versionForIndexing, null);
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
}
static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null);
}
}
@ -1096,7 +1112,7 @@ public class InternalEngine extends Engine {
if (plan.earlyResultOnPreflightError.isPresent()) {
deleteResult = plan.earlyResultOnPreflightError.get();
} else if (plan.deleteFromLucene) {
} else if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
@ -1152,7 +1168,7 @@ public class InternalEngine extends Engine {
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
final DeletionStrategy plan;
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
@ -1161,18 +1177,15 @@ public class InternalEngine extends Engine {
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
}
final DeletionStrategy plan;
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
}
}
return plan;
}
@ -1212,25 +1225,29 @@ public class InternalEngine extends Engine {
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
try {
if (plan.currentlyDeleted == false) {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
doc.add(softDeleteField);
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
} else {
indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField);
}
} else if (plan.currentlyDeleted == false) {
// any exception that comes from this is a either an ACE or a fatal exception there
// can't be any document failures coming from this
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
doc.add(softDeleteField);
indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField);
} else {
indexWriter.deleteDocuments(delete.uid());
}
numDocDeletes.inc();
indexWriter.deleteDocuments(delete.uid());
}
if (plan.deleteFromLucene) {
numDocDeletes.inc();
versionMap.putDeleteUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
}
versionMap.putDeleteUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
return new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} catch (Exception ex) {
@ -1247,12 +1264,13 @@ public class InternalEngine extends Engine {
protected static final class DeletionStrategy {
// of a rare double delete
final boolean deleteFromLucene;
final boolean addStaleOpToLucene;
final boolean currentlyDeleted;
final long seqNoOfDeletion;
final long versionOfDeletion;
final Optional<DeleteResult> earlyResultOnPreflightError;
private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted,
long seqNoOfDeletion, long versionOfDeletion,
DeleteResult earlyResultOnPreflightError) {
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
@ -1260,6 +1278,7 @@ public class InternalEngine extends Engine {
"deleteFromLucene: " + deleteFromLucene
+ " earlyResultOnPreFlightError:" + earlyResultOnPreflightError;
this.deleteFromLucene = deleteFromLucene;
this.addStaleOpToLucene = addStaleOpToLucene;
this.currentlyDeleted = currentlyDeleted;
this.seqNoOfDeletion = seqNoOfDeletion;
this.versionOfDeletion = versionOfDeletion;
@ -1271,16 +1290,22 @@ public class InternalEngine extends Engine {
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
}
static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
}
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted,
long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
}
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted,
long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
}
}
@ -1930,7 +1955,11 @@ public class InternalEngine extends Engine {
// pkg-private for testing
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return new IndexWriter(directory, iwc);
if (Assertions.ENABLED) {
return new AssertingIndexWriter(directory, iwc);
} else {
return new IndexWriter(directory, iwc);
}
}
private IndexWriterConfig getIndexWriterConfig() {
@ -2288,4 +2317,35 @@ public class InternalEngine extends Engine {
}
return commitData;
}
private final class AssertingIndexWriter extends IndexWriter {
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
}
@Override
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
assert softDeleteEnabled == false : "Call #updateDocument but soft-deletes is enabled";
return super.updateDocument(term, doc);
}
@Override
public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
assert softDeleteEnabled == false : "Call #updateDocuments but soft-deletes is enabled";
return super.updateDocuments(delTerm, docs);
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled";
return super.deleteDocuments(terms);
}
@Override
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException {
assert softDeleteEnabled : "Call #softUpdateDocument but soft-deletes is disabled";
return super.softUpdateDocument(term, doc, softDeletes);
}
@Override
public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends IndexableField>> docs, Field... softDeletes) throws IOException {
assert softDeleteEnabled : "Call #softUpdateDocuments but soft-deletes is disabled";
return super.softUpdateDocuments(term, docs, softDeletes);
}
}
}

View File

@ -44,10 +44,12 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
@ -81,6 +83,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
@ -2935,10 +2938,10 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
final Supplier<ParsedDocument> doc = () -> testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5));
Engine.Index retry = appendOnlyReplica(doc, true, 1, randomIntBetween(0, 5));
Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5));
Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5));
// operations with a seq# equal or lower to the local checkpoint are not indexed to lucene
// and the version lookup is skipped
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
@ -2977,8 +2980,8 @@ public class InternalEngineTests extends EngineTestCase {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
operation = randomAppendOnly(doc, false, 1);
retry = randomAppendOnly(doc, true, 1);
operation = randomAppendOnly(doc.get(), false, 1);
retry = randomAppendOnly(doc.get(), true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
@ -3043,6 +3046,7 @@ public class InternalEngineTests extends EngineTestCase {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
assertThat(getOperationSeqNoInLucene(engine), contains(20L));
}
public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
@ -3511,20 +3515,22 @@ public class InternalEngineTests extends EngineTestCase {
final List<Engine.Operation> operations = new ArrayList<>();
final int numberOfOperations = randomIntBetween(16, 32);
final Document document = testDocumentWithTextField();
final AtomicLong sequenceNumber = new AtomicLong();
final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA);
final LongSupplier sequenceNumberSupplier =
origin == PRIMARY ? () -> SequenceNumbers.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement;
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
final Term uid = newUid(doc);
final Supplier<ParsedDocument> doc = () -> {
final Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
return testParsedDocument("1", null, document, B_1, null);
};
final Term uid = newUid("1");
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
for (int i = 0; i < numberOfOperations; i++) {
if (randomBoolean()) {
final Engine.Index index = new Engine.Index(
uid,
doc,
doc.get(),
sequenceNumberSupplier.getAsLong(),
1,
i,
@ -4558,6 +4564,49 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testLuceneHistoryOnPrimary() throws Exception {
final List<Engine.Operation> operations = generateSingleDocHistory(false,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300);
assertOperationHistoryInLucene(operations);
}
public void testLuceneHistoryOnReplica() throws Exception {
final List<Engine.Operation> operations = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300);
Randomness.shuffle(operations);
assertOperationHistoryInLucene(operations);
}
private void assertOperationHistoryInLucene(List<Engine.Operation> operations) throws IOException {
final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy());
Set<Long> expectedSeqNos = new HashSet<>();
try (Store store = createStore();
Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) {
for (Engine.Operation op : operations) {
if (op instanceof Engine.Index) {
Engine.IndexResult indexResult = engine.index((Engine.Index) op);
assertThat(indexResult.getFailure(), nullValue());
expectedSeqNos.add(indexResult.getSeqNo());
} else {
Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op);
assertThat(deleteResult.getFailure(), nullValue());
expectedSeqNos.add(deleteResult.getSeqNo());
}
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
}
if (rarely()) {
engine.forceMerge(true);
}
}
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
}
}
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();

View File

@ -61,6 +61,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -3089,4 +3090,30 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id)));
closeShards(shard);
}
public void testSearcherIncludesSoftDeletes() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(shard);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}");
deleteDoc(shard, "test", "0");
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L));
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L));
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
}
closeShards(shard);
}
}

View File

@ -28,18 +28,22 @@ import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -53,6 +57,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
@ -88,7 +93,9 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
@ -551,6 +558,7 @@ public abstract class EngineTestCase extends ESTestCase {
} else {
startWithSeqNo = 0;
}
final int seqNoGap = randomBoolean() ? 1 : 2;
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
for (int i = 0; i < numOfOps; i++) {
@ -574,7 +582,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
@ -583,7 +591,7 @@ public abstract class EngineTestCase extends ESTestCase {
);
} else {
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
@ -664,6 +672,33 @@ public abstract class EngineTestCase extends ESTestCase {
}
}
/**
* Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene.
*/
public static Set<Long> getOperationSeqNoInLucene(Engine engine) throws IOException {
engine.refresh("test");
final Set<Long> seqNos = new HashSet<>();
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
NumericDocValues[] seqNoDocValues = new NumericDocValues[leaves.size()];
for (int i = 0; i < leaves.size(); i++) {
seqNoDocValues[i] = leaves.get(i).reader().getNumericDocValues(SeqNoFieldMapper.NAME);
}
TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
for (ScoreDoc scoreDoc : allDocs.scoreDocs) {
int leafIndex = ReaderUtil.subIndex(scoreDoc.doc, leaves);
int segmentDocId = scoreDoc.doc - leaves.get(leafIndex).docBase;
if (seqNoDocValues[leafIndex] != null && seqNoDocValues[leafIndex].advanceExact(segmentDocId)) {
seqNos.add(seqNoDocValues[leafIndex].longValue());
} else {
throw new AssertionError("Segment without seqno DocValues");
}
}
}
return seqNos;
}
/**
* Exposes a translog associated with the given engine for testing purpose.
*/