Use soft-update to maintain document history (#29458)
Today we can use the soft-update feature from Lucene to maintain a history of document. This change simply replaces hard-update in the Engine by soft-update methods. Stale operations, delete, and no-ops will be handled in subsequent changes. This change is just a cut-over from hard-update to soft-update, no new functionality has been introduced.
This commit is contained in:
parent
a90b3eadf3
commit
0dd61fcd2c
|
@ -27,6 +27,7 @@ import org.apache.lucene.codecs.CodecUtil;
|
|||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.document.LatLonDocValuesField;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterLeafReader;
|
||||
|
@ -96,6 +97,8 @@ public class Lucene {
|
|||
assert annotation == null : "DocValuesFormat " + LATEST_DOC_VALUES_FORMAT + " is deprecated" ;
|
||||
}
|
||||
|
||||
public static final String SOFT_DELETE_FIELD = "__soft_delete";
|
||||
|
||||
public static final NamedAnalyzer STANDARD_ANALYZER = new NamedAnalyzer("_standard", AnalyzerScope.GLOBAL, new StandardAnalyzer());
|
||||
public static final NamedAnalyzer KEYWORD_ANALYZER = new NamedAnalyzer("_keyword", AnalyzerScope.GLOBAL, new KeywordAnalyzer());
|
||||
|
||||
|
@ -829,4 +832,11 @@ public class Lucene {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a numeric docvalues which can be used to soft-delete documents.
|
||||
*/
|
||||
public static NumericDocValuesField newSoftDeleteField() {
|
||||
return new NumericDocValuesField(SOFT_DELETE_FIELD, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,6 +130,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
IndexSettings.MAX_REGEX_LENGTH_SETTING,
|
||||
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
|
||||
IndexSettings.INDEX_GC_DELETES_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_SETTING,
|
||||
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
|
||||
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
|
||||
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
|
||||
|
|
|
@ -238,6 +238,12 @@ public final class IndexSettings {
|
|||
public static final Setting<TimeValue> INDEX_GC_DELETES_SETTING =
|
||||
Setting.timeSetting("index.gc_deletes", DEFAULT_GC_DELETES, new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic,
|
||||
Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Specifies if the index should use soft-delete instead of hard-delete for update/delete operations.
|
||||
*/
|
||||
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope);
|
||||
|
||||
/**
|
||||
* The maximum number of refresh listeners allows on this shard.
|
||||
*/
|
||||
|
@ -282,6 +288,7 @@ public final class IndexSettings {
|
|||
private final IndexSortConfig indexSortConfig;
|
||||
private final IndexScopedSettings scopedSettings;
|
||||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
private final boolean softDeleteEnabled;
|
||||
private volatile boolean warmerEnabled;
|
||||
private volatile int maxResultWindow;
|
||||
private volatile int maxInnerResultWindow;
|
||||
|
@ -393,6 +400,7 @@ public final class IndexSettings {
|
|||
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
|
||||
mergeSchedulerConfig = new MergeSchedulerConfig(this);
|
||||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
|
||||
softDeleteEnabled = version.onOrAfter(Version.V_7_0_0_alpha1) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
|
||||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
|
||||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
|
||||
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
|
||||
|
@ -839,4 +847,11 @@ public final class IndexSettings {
|
|||
* Returns the time that an index shard becomes search idle unless it's accessed in between
|
||||
*/
|
||||
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if soft-delete is enabled.
|
||||
*/
|
||||
public boolean isSoftDeleteEnabled() {
|
||||
return softDeleteEnabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,13 +39,13 @@ public final class CommitStats implements Streamable, ToXContentFragment {
|
|||
private String id; // lucene commit id in base 64;
|
||||
private int numDocs;
|
||||
|
||||
public CommitStats(SegmentInfos segmentInfos) {
|
||||
public CommitStats(SegmentInfos segmentInfos, int numDocs) {
|
||||
// clone the map to protect against concurrent changes
|
||||
userData = MapBuilder.<String, String>newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap();
|
||||
// lucene calls the current generation, last generation.
|
||||
generation = segmentInfos.getLastGeneration();
|
||||
id = Base64.getEncoder().encodeToString(segmentInfos.getId());
|
||||
numDocs = Lucene.getNumDocs(segmentInfos);
|
||||
this.numDocs = numDocs;
|
||||
}
|
||||
|
||||
private CommitStats() {
|
||||
|
|
|
@ -536,7 +536,9 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
/** get commits stats for the last commit */
|
||||
public CommitStats commitStats() {
|
||||
return new CommitStats(getLastCommittedSegmentInfos());
|
||||
try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) {
|
||||
return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
|
@ -67,7 +68,6 @@ import org.elasticsearch.index.merge.OnGoingMerge;
|
|||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
|
@ -138,6 +138,8 @@ public class InternalEngine extends Engine {
|
|||
private final CounterMetric numDocDeletes = new CounterMetric();
|
||||
private final CounterMetric numDocAppends = new CounterMetric();
|
||||
private final CounterMetric numDocUpdates = new CounterMetric();
|
||||
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
|
||||
private final boolean softDeleteEnabled;
|
||||
|
||||
/**
|
||||
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
|
||||
|
@ -161,6 +163,7 @@ public class InternalEngine extends Engine {
|
|||
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
|
||||
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
|
||||
}
|
||||
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
|
||||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
|
@ -768,6 +771,7 @@ public class InternalEngine extends Engine {
|
|||
} else if (plan.indexIntoLucene) {
|
||||
indexResult = indexIntoLucene(index, plan);
|
||||
} else {
|
||||
// TODO: We need to index stale documents to have a full history in Lucene.
|
||||
indexResult = new IndexResult(
|
||||
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
}
|
||||
|
@ -1066,11 +1070,19 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
if (softDeleteEnabled) {
|
||||
if (docs.size() > 1) {
|
||||
indexWriter.softUpdateDocuments(uid, docs, softDeleteField);
|
||||
} else {
|
||||
indexWriter.softUpdateDocument(uid, docs.get(0), softDeleteField);
|
||||
}
|
||||
} else {
|
||||
if (docs.size() > 1) {
|
||||
indexWriter.updateDocuments(uid, docs);
|
||||
} else {
|
||||
indexWriter.updateDocument(uid, docs.get(0));
|
||||
}
|
||||
}
|
||||
numDocUpdates.inc(docs.size());
|
||||
}
|
||||
|
||||
|
@ -1927,11 +1939,14 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
|
||||
iwc.setMergeScheduler(mergeScheduler);
|
||||
MergePolicy mergePolicy = config().getMergePolicy();
|
||||
// Give us the opportunity to upgrade old segments while performing
|
||||
// background merges
|
||||
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
|
||||
iwc.setMergePolicy(mergePolicy);
|
||||
MergePolicy mergePolicy = config().getMergePolicy();
|
||||
if (softDeleteEnabled) {
|
||||
// TODO: soft-delete retention policy
|
||||
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
|
||||
}
|
||||
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
|
||||
iwc.setSimilarity(engineConfig.getSimilarity());
|
||||
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
|
||||
iwc.setCodec(engineConfig.getCodec());
|
||||
|
|
|
@ -1007,7 +1007,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
|
||||
final String extension = IndexFileNames.getExtension(meta.name());
|
||||
assert FIELD_INFOS_FILE_EXTENSION.equals(extension) == false || IndexFileNames.stripExtension(IndexFileNames.stripSegmentName(meta.name())).isEmpty() : "FieldInfos are generational but updateable DV are not supported in elasticsearch";
|
||||
if (IndexFileNames.SEGMENTS.equals(segmentId) || DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
|
||||
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
|
||||
perCommitStoreFiles.add(meta);
|
||||
|
|
|
@ -119,6 +119,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers;
|
|||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -2711,6 +2712,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException {
|
||||
maybeThrowFailure();
|
||||
return super.softUpdateDocument(term, doc, softDeletes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long deleteDocuments(Term... terms) throws IOException {
|
||||
maybeThrowFailure();
|
||||
|
|
Loading…
Reference in New Issue