Rebuild version map when opening internal engine (#43202)

With this change, we will rebuild the live version map and local
checkpoint using documents (including soft-deleted) from the safe commit
when opening an internal engine. This allows us to safely prune away _id
of all soft-deleted documents as the version map is always in-sync with
the Lucene index.

Relates #40741
Supersedes #42979
This commit is contained in:
Nhat Nguyen 2019-06-17 17:50:54 -04:00
parent 365f87c622
commit 0c5086d2f3
9 changed files with 348 additions and 162 deletions

View File

@ -27,7 +27,6 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
@ -95,7 +94,6 @@ import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import java.io.IOException;
import java.text.ParseException;
@ -105,7 +103,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;
public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
@ -1065,39 +1062,4 @@ public class Lucene {
}
};
}
/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
*
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
*/
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
onNewSeqNo.accept(seqNo);
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NumericDocValues;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import java.io.IOException;
import java.util.Objects;
final class CombinedDocValues {
private final NumericDocValues versionDV;
private final NumericDocValues seqNoDV;
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;
CombinedDocValues(LeafReader leafReader) throws IOException {
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
long docVersion(int segmentDocId) throws IOException {
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
}
long docSeqNo(int segmentDocId) throws IOException {
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}
long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
}
return primaryTermDV.longValue();
}
boolean isTombstone(int segmentDocId) throws IOException {
if (tombstoneDV == null) {
return false;
}
assert tombstoneDV.docID() < segmentDocId;
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}
boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
@ -31,17 +32,23 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
@ -68,12 +75,14 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@ -94,7 +103,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -110,7 +118,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InternalEngine extends Engine {
@ -207,6 +215,7 @@ public class InternalEngine extends Engine {
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
@ -236,11 +245,17 @@ public class InternalEngine extends Engine {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (softDeleteEnabled && localCheckpointTracker.getCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
} catch (IOException e) {
throw new EngineCreationFailureException(config().getShardId(),
"failed to restore version map and local checkpoint tracker", e);
}
}
success = true;
} finally {
if (success == false) {
@ -254,30 +269,16 @@ public class InternalEngine extends Engine {
logger.trace("created new InternalEngine");
}
private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo,
tracker::markSeqNoAsCompleted);
}
}
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
}
private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}
private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
@ -680,21 +681,26 @@ public class InternalEngine extends Engine {
LUCENE_DOC_NOT_FOUND
}
private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) {
Objects.requireNonNull(versionValue);
if (seqNo > versionValue.seqNo) {
return OpVsLuceneDocStatus.OP_NEWER;
} else if (seqNo == versionValue.seqNo) {
assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo
+ " op_term=" + primaryTerm + " existing_term=" + versionValue.term;
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
}
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == versionValue.seqNo) {
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
// load from index
assert incrementIndexVersionLookup();
@ -1879,8 +1885,9 @@ public class InternalEngine extends Engine {
}
// for testing
final Collection<DeleteVersionValue> getDeletedTombstones() {
return versionMap.getAllTombstones().values();
final Map<BytesRef, VersionValue> getVersionMap() {
return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
@ -2518,10 +2525,6 @@ public class InternalEngine extends Engine {
return true;
}
int getVersionMapSize() {
return versionMap.getAllCurrent().size();
}
boolean isSafeAccessRequired() {
return versionMap.isSafeAccessRequired();
}
@ -2784,4 +2787,57 @@ public class InternalEngine extends Engine {
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}
/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
* are in sync with the Lucene commit.
*/
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final CombinedDocValues dv = new CombinedDocValues(leaf.reader());
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
final DocIdSetIterator iterator = scorer.iterator();
int docId;
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
final long primaryTerm = dv.docPrimaryTerm(docId);
if (primaryTerm == -1L) {
continue; // skip children docs which do not have primary term
}
final long seqNo = dv.docSeqNo(docId);
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
idFieldVisitor.reset();
leaf.reader().document(docId, idFieldVisitor);
if (idFieldVisitor.getId() == null) {
assert dv.isTombstone(docId);
continue;
}
final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes();
try (Releasable ignored = versionMap.acquireLock(uid)) {
final VersionValue curr = versionMap.getUnderLock(uid);
if (curr == null ||
compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) {
if (dv.isTombstone(docId)) {
// use 0L for the start time so we can prune this delete tombstone quickly
// when the local checkpoint advances (i.e., after a recovery completed).
final long startTime = 0L;
versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime));
} else {
versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm));
}
}
}
}
}
// remove live entries in the version map
refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true);
}
}

View File

@ -40,14 +40,12 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -305,64 +303,4 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
}
private static final class CombinedDocValues {
private final NumericDocValues versionDV;
private final NumericDocValues seqNoDV;
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;
CombinedDocValues(LeafReader leafReader) throws IOException {
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
long docVersion(int segmentDocId) throws IOException {
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
}
long docSeqNo(int segmentDocId) throws IOException {
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}
long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
}
return primaryTermDV.longValue();
}
boolean isTombstone(int segmentDocId) throws IOException {
if (tombstoneDV == null) {
return false;
}
assert tombstoneDV.docID() < segmentDocId;
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}
boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fieldvisitor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.StoredFieldVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Uid;
public final class IdOnlyFieldVisitor extends StoredFieldVisitor {
private String id = null;
private boolean visited = false;
@Override
public Status needsField(FieldInfo fieldInfo) {
if (visited) {
return Status.STOP;
}
if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
visited = true;
return Status.YES;
} else {
return Status.NO;
}
}
@Override
public void binaryField(FieldInfo fieldInfo, byte[] value) {
assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo;
if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
id = Uid.decodeId(value);
}
}
public String getId() {
return id;
}
public void reset() {
id = null;
visited = false;
}
}

View File

@ -222,7 +222,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine.Index update = indexForDoc(doc);
engine.index(update);
assertTrue(engine.isSafeAccessRequired());
assertEquals(1, engine.getVersionMapSize());
assertThat(engine.getVersionMap().values(), hasSize(1));
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
assertEquals(0, searcher.reader().numDocs());
}
@ -254,7 +254,7 @@ public class InternalEngineTests extends EngineTestCase {
: appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine));
engine.index(operation);
assertTrue("safe access should be required", engine.isSafeAccessRequired());
assertEquals(1, engine.getVersionMapSize()); // now we add this to the map
assertThat(engine.getVersionMap().values(), hasSize(1)); // now we add this to the map
engine.refresh("test");
if (randomBoolean()) { // randomly refresh here again
engine.refresh("test");
@ -3868,7 +3868,7 @@ public class InternalEngineTests extends EngineTestCase {
} catch (InterruptedException e) {
throw new AssertionError(e);
}
assertEquals(0, engine.getVersionMapSize());
assertThat(engine.getVersionMap().values(), empty());
int docOffset;
while ((docOffset = offset.incrementAndGet()) < docs.size()) {
try {
@ -5167,18 +5167,19 @@ public class InternalEngineTests extends EngineTestCase {
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis()));
}
}
List<DeleteVersionValue> tombstones = new ArrayList<>(engine.getDeletedTombstones());
List<DeleteVersionValue> tombstones = new ArrayList<>(tombstonesInVersionMap(engine).values());
engine.config().setEnableGcDeletes(true);
// Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval.
clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval));
engine.refresh("test");
tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval);
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray()));
// Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno).
clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4.
engine.refresh("test");
tombstones.removeIf(v -> v.seqNo < gapSeqNo);
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray()));
// Fill the seqno gap - should prune all tombstones.
clock.set(between(0, 100));
if (randomBoolean()) {
@ -5190,7 +5191,7 @@ public class InternalEngineTests extends EngineTestCase {
}
clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4.
engine.refresh("test");
assertThat(engine.getDeletedTombstones(), empty());
assertThat(tombstonesInVersionMap(engine).values(), empty());
}
}
@ -5604,9 +5605,10 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testRebuildLocalCheckpointTracker() throws Exception {
public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
@ -5623,32 +5625,54 @@ public class InternalEngineTests extends EngineTestCase {
for (Engine.Operation op : operations) {
flushedOperations.add(op);
applyOperation(engine, op);
if (randomBoolean()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (randomInt(100) < 5) {
engine.flush();
engine.flush(true, true);
flushedOperations.sort(Comparator.comparing(Engine.Operation::seqNo));
commits.add(new ArrayList<>(flushedOperations));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
docs = getDocIds(engine, true);
}
Set<Long> seqNosInSafeCommit = null;
List<Engine.Operation> operationsInSafeCommit = null;
for (int i = commits.size() - 1; i >= 0; i--) {
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
seqNosInSafeCommit = commits.get(i).stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
operationsInSafeCommit = commits.get(i);
break;
}
}
assertThat(seqNosInSafeCommit, notNullValue());
assertThat(operationsInSafeCommit, notNullValue());
try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
final Map<BytesRef, Engine.Operation> deletesAfterCheckpoint = new HashMap<>();
for (Engine.Operation op : operationsInSafeCommit) {
if (op instanceof Engine.NoOp == false && op.seqNo() > engine.getLocalCheckpoint()) {
deletesAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op);
}
}
deletesAfterCheckpoint.values().removeIf(o -> o instanceof Engine.Delete == false);
final Map<BytesRef, VersionValue> versionMap = engine.getVersionMap();
for (BytesRef uid : deletesAfterCheckpoint.keySet()) {
final VersionValue versionValue = versionMap.get(uid);
final Engine.Operation op = deletesAfterCheckpoint.get(uid);
final String msg = versionValue + " vs " +
"op[" + op.operationType() + "id=" + op.id() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm() + "]";
assertThat(versionValue, instanceOf(DeleteVersionValue.class));
assertThat(msg, versionValue.seqNo, equalTo(op.seqNo()));
assertThat(msg, versionValue.term, equalTo(op.primaryTerm()));
assertThat(msg, versionValue.version, equalTo(op.version()));
}
assertThat(versionMap.keySet(), equalTo(deletesAfterCheckpoint.keySet()));
final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
final Set<Long> seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet());
for (Engine.Operation op : operations) {
assertThat(
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
}
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@ -5917,4 +5941,43 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}
public void testRecoverFromLocalTranslog() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path translogPath = createTempDir();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndSource> docs;
try (InternalEngine engine = createEngine(config)) {
for (Engine.Operation op : operations) {
applyOperation(engine, op);
if (randomBoolean()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (randomInt(100) < 5) {
engine.flush();
}
if (randomInt(100) < 5) {
engine.forceMerge(randomBoolean(), 1, false, false, false);
}
}
docs = getDocIds(engine, true);
}
try (InternalEngine engine = new InternalEngine(config)) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, randomBoolean()), equalTo(docs));
}
}
}
private Map<BytesRef, DeleteVersionValue> tombstonesInVersionMap(InternalEngine engine) {
return engine.getVersionMap().entrySet().stream()
.filter(e -> e.getValue() instanceof DeleteVersionValue)
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
}
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -725,6 +726,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
if (randomInt(100) < 10) {
shards.getPrimary().flush(new FlushRequest());
}
if (randomInt(100) < 5) {
shards.getPrimary().forceMerge(new ForceMergeRequest().flush(randomBoolean()).maxNumSegments(1));
}
} catch (Exception ex) {
throw new AssertionError(ex);
}

View File

@ -37,6 +37,7 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
@ -1111,10 +1112,8 @@ public abstract class EngineTestCase extends ESTestCase {
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
greaterThanOrEqualTo(maxSeqNosInReader(reader)));
}
}
}
@ -1177,4 +1176,15 @@ public abstract class EngineTestCase extends ESTestCase {
return get();
}
}
static long maxSeqNosInReader(DirectoryReader reader) throws IOException {
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (LeafReaderContext leaf : reader.leaves()) {
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
while (seqNoDocValues.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNoDocValues.longValue());
}
}
return maxSeqNo;
}
}

View File

@ -215,6 +215,9 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
if (rarely()) {
followerClient().admin().indices().prepareFlush("follower-index").get();
}
if (rarely()) {
followerClient().admin().indices().prepareForceMerge("follower-index").setMaxNumSegments(1).get();
}
if (rarely()) {
followerClient().admin().indices().prepareRefresh("follower-index").get();
}