Use soft deletes to maintain doc history (#29549)

Today we can use the soft-deletes feature from Lucene to maintain a
history of a document. This change simply replaces hard-deletes by
soft-deletes in Engine.

Besides marking a document as deleted, we also index a tombstone
associated with that delete operation. Storing delete tombstones allows
us to have a history of sequence-based operations which can serve in
recovery or rollback.

Relates #29530
This commit is contained in:
Nhat Nguyen 2018-04-19 20:45:13 -04:00 committed by GitHub
parent 4be1488324
commit ac84879a71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 125 additions and 23 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -80,6 +81,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;
private final TombstoneDocSupplier tombstoneDocSupplier;
/**
* Index setting to change the low level lucene codec used for writing new segments.
@ -126,7 +128,8 @@ public final class EngineConfig {
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) {
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
@ -154,6 +157,7 @@ public final class EngineConfig {
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
}
/**
@ -363,4 +367,17 @@ public final class EngineConfig {
public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}
/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
*/
@FunctionalInterface
public interface TombstoneDocSupplier {
ParsedDocument newTombstoneDoc(String type, String id);
}
public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}
}

View File

@ -63,6 +63,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@ -1220,7 +1221,17 @@ public class InternalEngine extends Engine {
if (plan.currentlyDeleted == false) {
// any exception that comes from this is a either an ACE or a fatal exception there
// can't be any document failures coming from this
indexWriter.deleteDocuments(delete.uid());
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
doc.add(softDeleteField);
indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField);
} else {
indexWriter.deleteDocuments(delete.uid());
}
numDocDeletes.inc();
}
versionMap.putUnderLock(delete.uid().bytes(),

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
@ -39,14 +40,15 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap;
import java.util.stream.Stream;
public class DocumentMapper implements ToXContentFragment {
@ -122,6 +124,7 @@ public class DocumentMapper implements ToXContentFragment {
private final Map<String, ObjectMapper> objectMappers;
private final boolean hasNestedObjects;
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;
public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
@ -130,6 +133,10 @@ public class DocumentMapper implements ToXContentFragment {
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
final Collection<String> tombstoneFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
// collect all the mappers for this type
List<ObjectMapper> newObjectMappers = new ArrayList<>();
@ -241,7 +248,12 @@ public class DocumentMapper implements ToXContentFragment {
}
public ParsedDocument parse(SourceToParse source) throws MapperParsingException {
return documentParser.parseDocument(source);
return documentParser.parseDocument(source, mapping.metadataMappers);
}
public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
}
/**

View File

@ -55,7 +55,7 @@ final class DocumentParser {
this.docMapper = docMapper;
}
ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
ParsedDocument parseDocument(SourceToParse source, MetadataFieldMapper[] metadataFieldsMappers) throws MapperParsingException {
validateType(source);
final Mapping mapping = docMapper.mapping();
@ -66,7 +66,7 @@ final class DocumentParser {
LoggingDeprecationHandler.INSTANCE, source.source(), xContentType)) {
context = new ParseContext.InternalParseContext(indexSettings.getSettings(), docMapperParser, docMapper, source, parser);
validateStart(parser);
internalParseDocument(mapping, context, parser);
internalParseDocument(mapping, metadataFieldsMappers, context, parser);
validateEnd(parser);
} catch (Exception e) {
throw wrapInMapperParsingException(source, e);
@ -81,10 +81,11 @@ final class DocumentParser {
return parsedDocument(source, context, createDynamicUpdate(mapping, docMapper, context.getDynamicMappers()));
}
private static void internalParseDocument(Mapping mapping, ParseContext.InternalParseContext context, XContentParser parser) throws IOException {
private static void internalParseDocument(Mapping mapping, MetadataFieldMapper[] metadataFieldsMappers,
ParseContext.InternalParseContext context, XContentParser parser) throws IOException {
final boolean emptyDoc = isEmptyDoc(mapping, parser);
for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.preParse(context);
}
@ -95,7 +96,7 @@ final class DocumentParser {
parseObjectOrNested(context, mapping.root);
}
for (MetadataFieldMapper metadataMapper : mapping.metadataMappers) {
for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.postParse(context);
}
}

View File

@ -2158,7 +2158,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
this::createTombstoneDoc);
}
/**
@ -2586,4 +2587,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
}
}
private ParsedDocument createTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
}
}

View File

@ -119,7 +119,6 @@ import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -2584,7 +2583,7 @@ public class InternalEngineTests extends EngineTestCase {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get);
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
@ -75,8 +76,8 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
@ -86,8 +87,12 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@ -154,6 +159,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
@ -2339,8 +2345,15 @@ public class IndexShardTests extends IndexShardTestCase {
assertTrue(searcher.reader().numDocs() <= docStats.getCount());
}
assertThat(docStats.getCount(), equalTo(numDocs));
// Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete));
// Lucene will delete a segment if all docs are deleted from it;
// this means that we lose the deletes when deleting all docs.
// If soft-delete is enabled, each delete op will add a deletion marker.
final long deleteTombstones = indexShard.indexSettings.isSoftDeleteEnabled() ? numDocsToDelete : 0L;
if (numDocsToDelete == numDocs) {
assertThat(docStats.getDeleted(), equalTo(deleteTombstones));
} else {
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete + deleteTombstones));
}
}
// merge them away
@ -2968,6 +2981,7 @@ public class IndexShardTests extends IndexShardTestCase {
// Close remaining searchers
IOUtils.close(searchers);
primary.refresh("test");
SegmentsStats ss = primary.segmentStats(randomBoolean());
CircuitBreaker breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
@ -3053,4 +3067,16 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary);
}
public void testSupplyTombstoneDoc() throws Exception {
IndexShard shard = newStartedShard();
String id = randomRealisticUnicodeOfLengthBetween(1, 10);
ParsedDocument tombstone = shard.getEngine().config().getTombstoneDocSupplier().newTombstoneDoc("doc", id);
assertThat(tombstone.docs(), hasSize(1));
ParseContext.Document doc = tombstone.docs().get(0);
assertThat(doc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()),
containsInAnyOrder(SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME,
IdFieldMapper.NAME, VersionFieldMapper.NAME));
assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id)));
closeShards(shard);
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
@ -130,7 +131,8 @@ public class RefreshListenersTests extends ESTestCase {
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm);
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
EngineTestCase::createTombstoneDoc);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);

View File

@ -76,6 +76,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -119,6 +120,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -2026,7 +2028,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
// only one shard
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
final Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build();
assertAcked(prepareCreate("test").setSettings(indexSettings));
ensureGreen();
logger.info("--> indexing");
@ -2072,7 +2076,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0);
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
for (SnapshotIndexShardStatus status : shards) {
assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
// we flush before the snapshot such that we have to process the segments_N files plus the .del file
if (INDEX_SOFT_DELETES_SETTING.get(indexSettings)) {
// soft-delete generates DV files.
assertThat(status.getStats().getProcessedFiles(), greaterThan(2));
} else {
assertThat(status.getStats().getProcessedFiles(), equalTo(2));
}
}
}
}

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
@ -65,6 +64,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -189,7 +189,8 @@ public abstract class EngineTestCase extends ESTestCase {
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier());
config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(),
EngineTestCase::createTombstoneDoc);
}
public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
@ -198,7 +199,8 @@ public abstract class EngineTestCase extends ESTestCase {
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier());
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier());
}
@Override
@ -253,6 +255,23 @@ public abstract class EngineTestCase extends ESTestCase {
mappingUpdate);
}
/**
* Creates a tombstone document that only includes uid, seq#, term and version fields.
*/
public static ParsedDocument createTombstoneDoc(String type, String id) {
final ParseContext.Document document = new ParseContext.Document();
Field uidField = new Field(IdFieldMapper.NAME, Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE);
document.add(uidField);
Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0);
document.add(versionField);
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
return new ParsedDocument(versionField, seqID, id, type, null, Collections.singletonList(document),
new BytesArray("{}"), XContentType.JSON, null);
}
protected Store createStore() throws IOException {
return createStore(newDirectory());
}
@ -461,7 +480,7 @@ public abstract class EngineTestCase extends ESTestCase {
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) :
globalCheckpointSupplier, primaryTerm::get);
globalCheckpointSupplier, primaryTerm::get, EngineTestCase::createTombstoneDoc);
return config;
}