Use Lucene soft-deletes in peer recovery (#30522)

This commit adds Lucene soft-deletes as another source for peer-recovery
besides translog.

Relates #29530
This commit is contained in:
Nhat Nguyen 2018-06-21 09:30:08 -04:00 committed by GitHub
parent d467be300e
commit 601ea76b76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 648 additions and 187 deletions

View File

@ -101,7 +101,8 @@ which returns something similar to:
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no": "0"
},
"num_docs" : 0
}

View File

@ -46,14 +46,17 @@ import java.util.function.LongSupplier;
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) {
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.softDeletesPolicy = softDeletesPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}
@ -80,7 +83,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
deleteCommit(commits.get(i));
}
}
updateTranslogDeletionPolicy();
updateRetentionPolicy();
}
private void deleteCommit(IndexCommit commit) throws IOException {
@ -90,7 +93,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
}
private void updateTranslogDeletionPolicy() throws IOException {
private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
@ -101,6 +104,9 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}
/**

View File

@ -98,6 +98,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
protected final ShardId shardId;
protected final String allocationId;
@ -578,7 +579,10 @@ public abstract class Engine implements Closeable {
public abstract void syncTranslog() throws IOException;
public abstract Closeable acquireTranslogRetentionLock();
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLockForPeerRecovery();
/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
@ -586,11 +590,6 @@ public abstract class Engine implements Closeable {
*/
public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException;
/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);
public abstract TranslogStats getTranslogStats();
/**
@ -604,6 +603,19 @@ public abstract class Engine implements Closeable {
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;
/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public abstract Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException;
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException;
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
@ -38,7 +37,6 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
@ -153,6 +151,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
private final boolean softDeleteEnabled;
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
/**
@ -177,7 +176,6 @@ public class InternalEngine extends Engine {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
@ -199,8 +197,10 @@ public class InternalEngine extends Engine {
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
@ -257,6 +257,18 @@ public class InternalEngine extends Engine {
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}
private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().userData;
final long lastMinRetainedSeqNo;
if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO));
} else {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations());
}
/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
@ -467,19 +479,40 @@ public class InternalEngine extends Engine {
revisitIndexDeletionPolicyOnTranslogSynced();
}
@Override
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}
@Override
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
}
/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE);
}
}
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot =
newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
} catch (IOException ex) {
maybeFailEngine(source, ex);
throw ex;
}
} else {
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}
}
@Override
@ -2070,8 +2103,8 @@ public class InternalEngine extends Engine {
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy));
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
@ -2084,20 +2117,6 @@ public class InternalEngine extends Engine {
return iwc;
}
/**
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
private Query softDeletesRetentionQuery() {
ensureOpen();
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
@ -2284,6 +2303,9 @@ public class InternalEngine extends Engine {
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
@ -2339,6 +2361,8 @@ public class InternalEngine extends Engine {
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations());
}
public MergeStats getMergeStats() {
@ -2452,6 +2476,41 @@ public class InternalEngine extends Engine {
}
}
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}
}
/**
* Returns the minimum seqno that is retained in the Lucene index.
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
return softDeletesPolicy.getMinRetainedSeqNo();
}
@Override
public Closeable acquireRetentionLockForPeerRecovery() {
final Closeable translogLock = translog.acquireRetentionLock();
final Releasable softDeletesLock = softDeletesPolicy.acquireRetentionLock();
return () -> IOUtils.close(translogLock, softDeletesLock);
}
@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();

View File

@ -116,7 +116,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
@Override
public int overriddenOperations() {
public int skippedOperations() {
return skippedOperations;
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
/**
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose.
*/
final class SoftDeletesPolicy {
private final LongSupplier globalCheckpointSupplier;
private long localCheckpointOfSafeCommit;
// This lock count is used to prevent `minRetainedSeqNo` from advancing.
private int retentionLockCount;
// The extra number of operations before the global checkpoint are retained
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
/**
* Updates the number of soft-deleted documents prior to the global checkpoint to be retained
* See {@link org.elasticsearch.index.IndexSettings#INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING}
*/
synchronized void setRetentionOperations(long retentionOperations) {
this.retentionOperations = retentionOperations;
}
/**
* Sets the local checkpoint of the current safe commit
*/
synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException("Local checkpoint can't go backwards; " +
"new checkpoint [" + newCheckpoint + "]," + "current checkpoint [" + localCheckpointOfSafeCommit + "]");
}
this.localCheckpointOfSafeCommit = newCheckpoint;
}
/**
* Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to
* make sure that all operations that are being retained will be retained until the lock is released.
* This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()}
*/
synchronized Releasable acquireRetentionLock() {
assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
retentionLockCount++;
final AtomicBoolean released = new AtomicBoolean();
return () -> {
if (released.compareAndSet(false, true)) {
releaseRetentionLock();
}
};
}
private synchronized void releaseRetentionLock() {
assert retentionLockCount > 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
retentionLockCount--;
}
/**
* Returns the min seqno that is retained in the Lucene index.
* Operations whose seq# is least this value should exist in the Lucene index.
*/
synchronized long getMinRetainedSeqNo() {
// Do not advance if the retention lock is held
if (retentionLockCount == 0) {
// This policy retains operations for two purposes: peer-recovery and querying changes history.
// - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
// then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit;
// - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global
// checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs.
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations;
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1;
// This can go backward as the retentionOperations value can be changed in settings.
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return minRetainedSeqNo;
}
/**
* Returns a soft-deletes retention query that will be used in {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy}
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
Query getRetentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
}
}

View File

@ -1597,10 +1597,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Acquires a lock on the translog files, preventing them from being trimmed.
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public Closeable acquireTranslogRetentionLock() {
return getEngine().acquireTranslogRetentionLock();
public Closeable acquireRetentionLockForPeerRecovery() {
return getEngine().acquireRetentionLockForPeerRecovery();
}
/**
@ -1608,14 +1608,31 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
// TODO: Remove this method after primary-replica resync use soft-deletes
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
}
/**
* Returns the estimated number of operations in translog whose seq# at least the provided seq#.
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo);
public int estimateNumberOfHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().estimateNumberOfHistoryOperations(source, mapperService, startingSeqNo);
}
/**
* Creates a new history snapshot for reading operations since the provided starting seqno (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().readHistoryOperations(source, mapperService, startingSeqNo);
}
/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()}
*/
public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo);
}
/**

View File

@ -83,6 +83,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
ActionListener<ResyncTask> resyncListener = null;
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
// TODO: A follow-up to make resync using soft-deletes
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
resyncListener = new ActionListener<ResyncTask>() {

View File

@ -40,6 +40,7 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
@ -202,9 +203,23 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
if (previous.v1().equals(data) == false) {
Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput()));
Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput()));
throw new AssertionError(
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
// TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp.
final boolean sameOp;
if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) {
final Translog.Index o1 = (Translog.Index) newOp;
final Translog.Index o2 = (Translog.Index) prvOp;
sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type())
&& Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing())
&& o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo()
&& o1.version() == o2.version() && o1.versionType() == o2.versionType();
} else {
sameOp = false;
}
if (sameOp == false) {
throw new AssertionError(
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
}
}
} else {
seenSequenceNumbers.put(seqNo,

View File

@ -146,11 +146,11 @@ public class RecoverySourceHandler {
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
try (Closeable ignored = shard.acquireTranslogRetentionLock()) {
try (Closeable ignored = shard.acquireRetentionLockForPeerRecovery()) {
final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
@ -164,12 +164,13 @@ public class RecoverySourceHandler {
}
// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0;
startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled.
// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
try {
phase1(phase1Snapshot.getIndexCommit(), () -> shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
@ -186,7 +187,8 @@ public class RecoverySourceHandler {
try {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
@ -207,11 +209,13 @@ public class RecoverySourceHandler {
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateTranslogOperationsFromMinSeq(startingSeqNo));
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final long targetLocalCheckpoint;
try(Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) {
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
@ -268,36 +272,6 @@ public class RecoverySourceHandler {
});
}
/**
* Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source
* translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain
* all ops above the source local checkpoint, so we can stop check there.
*
* @return {@code true} if the source is ready for a sequence-number-based recovery
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
final long startingSeqNo = request.startingSeqNo();
assert startingSeqNo >= 0;
final long localCheckpoint = shard.getLocalCheckpoint();
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint);
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
if (startingSeqNo - 1 <= localCheckpoint) {
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
return tracker.getCheckpoint() >= localCheckpoint;
} else {
return false;
}
}
/**
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -397,7 +398,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
.get();
logger.info("--> indexing docs");
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
int numDocs = randomIntBetween(1, 1024);
for (int i = 0; i < numDocs; i++) {
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}
@ -419,12 +421,15 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
}
logger.info("--> restart replica node");
boolean softDeleteEnabled = internalCluster().getInstance(IndicesService.class, primaryNode)
.indexServiceSafe(resolveIndex("test")).getShard(0).indexSettings().isSoftDeleteEnabled();
int moreDocs = randomIntBetween(1, 1024);
internalCluster().restartNode(replicaNode, new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
// index some more documents; we expect to reuse the files that already exist on the replica
for (int i = 0; i < randomIntBetween(1, 1024); i++) {
for (int i = 0; i < moreDocs; i++) {
client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}
@ -432,8 +437,12 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
client(primaryNode).admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)
).get();
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
if (softDeleteEnabled) { // We need an extra flush to advance the min_retained_seqno of the SoftDeletesPolicy
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
}
return super.onNodeStopped(nodeName);
}
});
@ -473,7 +482,9 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
// both cases will be zero once we start sending only ops after local checkpoint of the safe commit
int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0;
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps));
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -306,7 +307,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1))
.get();
IndexShard shard = indexService.getShard(0);
assertBusy(() -> assertThat(shard.estimateTranslogOperationsFromMinSeq(0L), equalTo(0)));
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
}
public void testIllegalFsyncInterval() {

View File

@ -51,20 +51,24 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = 0;
long lastCheckpoint = lastMaxSeqNo;
long lastTranslogGen = 0;
final UUID translogUUID = UUID.randomUUID();
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
maxSeqNoList.add(lastMaxSeqNo);
translogGenList.add(lastTranslogGen);
}
@ -85,14 +89,19 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex)));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)));
}
public void testAcquireIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
long lastTranslogGen = between(1, 20);
int safeIndex = 0;
List<IndexCommit> commitList = new ArrayList<>();
@ -102,8 +111,9 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
int newCommits = between(1, 10);
for (int n = 0; n < newCommits; n++) {
lastMaxSeqNo += between(1, 1000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
lastTranslogGen += between(1, 20);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
}
// Advance the global checkpoint to between [safeIndex, safeIndex + 1)
safeIndex = randomIntBetween(safeIndex, commitList.size() - 1);
@ -114,6 +124,9 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
globalCheckpoint.set(randomLongBetween(lower, upper));
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)));
// Captures and releases some commits
int captures = between(0, 5);
for (int n = 0; n < captures; n++) {
@ -132,7 +145,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
snapshottingCommits.remove(snapshot);
final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count();
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(indexPolicy.releaseCommit(snapshot),
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false));
}
@ -143,6 +156,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)));
}
snapshottingCommits.forEach(indexPolicy::releaseCommit);
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
@ -154,25 +169,27 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)));
}
public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
indexPolicy.onCommit(singletonList(legacyCommit));
verify(legacyCommit, never()).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));
assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()),
equalTo(legacyCommit));
long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
final IndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen);
final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen);
globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
@ -189,25 +206,32 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1));
}
public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
for (int i = 0; i < invalidCommits; i++) {
commitList.add(mockIndexCommit(randomNonNegativeLong(), UUID.randomUUID(), randomNonNegativeLong()));
long maxSeqNo = randomNonNegativeLong();
commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong()));
}
final UUID expectedTranslogUUID = UUID.randomUUID();
long lastTranslogGen = 0;
final int validCommits = between(1, 10);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
for (int i = 0; i < validCommits; i++) {
lastTranslogGen += between(1, 1000);
commitList.add(mockIndexCommit(randomNonNegativeLong(), expectedTranslogUUID, lastTranslogGen));
lastMaxSeqNo += between(1, 1000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen));
}
// We should never keep invalid commits regardless of the value of the global checkpoint.
@ -215,21 +239,26 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
for (int i = 0; i < invalidCommits - 1; i++) {
verify(commitList.get(i), times(1)).delete();
}
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(getLocalCheckpoint(CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get())) + 1));
}
public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
@ -256,8 +285,9 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}
}
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
@ -278,6 +308,10 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}).when(commit).delete();
}
private long getLocalCheckpoint(IndexCommit commit) throws IOException {
return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}
IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
@ -287,4 +321,5 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
resetDeletion(commit);
return commit;
}
}

View File

@ -128,6 +128,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
@ -254,9 +255,13 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
try (Store store = createStore();
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) {
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
List<Segment> segments = engine.segments(false);
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats(false).getCount(), equalTo(0L));
@ -328,8 +333,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get()));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.getTranslog().sync();
engine.refresh("test");
segments = engine.segments(false);
@ -1360,18 +1363,27 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
if (randomBoolean()) {
engine.flush(randomBoolean(), true);
}
}
engine.flush();
long localCheckpoint = engine.getLocalCheckpoint();
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
engine.getTranslog().sync();
engine.syncTranslog();
final long safeCommitCheckpoint;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
safeCommitCheckpoint = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1);
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
if (seqno < keptIndex) {
if (seqno < minSeqNoToRetain) {
Translog.Operation op = ops.get(seqno);
if (op != null) {
assertThat(op, instanceOf(Translog.Index.class));
@ -1384,8 +1396,10 @@ public class InternalEngineTests extends EngineTestCase {
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
engine.onSettingsChanged();
globalCheckpoint.set(localCheckpoint);
engine.getTranslog().sync();
engine.syncTranslog();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
@ -1414,7 +1428,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
engine.flush();
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean()
|| omitSourceAllTheTime);
@ -1426,23 +1439,31 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
if (randomBoolean()) {
engine.flush(randomBoolean(), true);
}
}
engine.flush();
globalCheckpoint.set(randomLongBetween(0, engine.getLocalCheckpoint()));
engine.syncTranslog();
final long minSeqNoToRetain;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
long safeCommitLocalCheckpoint = Long.parseLong(
safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
}
long localCheckpoint = engine.getLocalCheckpoint();
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
engine.getTranslog().sync();
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
if (luceneOp.seqNo() >= keptIndex) {
if (luceneOp.seqNo() >= minSeqNoToRetain) {
assertNotNull(luceneOp.getSource());
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
}
});
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
for (long seqno = 0; seqno <= engine.getLocalCheckpoint(); seqno++) {
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
if (seqno < keptIndex) {
if (seqno < minSeqNoToRetain) {
Translog.Operation op = ops.get(seqno);
if (op != null) {
assertThat(op, instanceOf(Translog.Index.class));
@ -1458,8 +1479,9 @@ public class InternalEngineTests extends EngineTestCase {
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
globalCheckpoint.set(localCheckpoint);
engine.getTranslog().sync();
engine.onSettingsChanged();
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.syncTranslog();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> {
assertEquals(translogOp.getSource().source, B_1);
@ -4845,6 +4867,76 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
IOUtils.close(engine, store);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final List<Engine.Operation> operations = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2");
Randomness.shuffle(operations);
Set<Long> existingSeqNos = new HashSet<>();
store = createStore();
engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get));
assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();
for (Engine.Operation op : operations) {
final Engine.Result result;
if (op instanceof Engine.Index) {
result = engine.index((Engine.Index) op);
} else {
result = engine.delete((Engine.Delete) op);
}
existingSeqNos.add(result.getSeqNo());
if (randomBoolean()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
}
if (rarely()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
engine.onSettingsChanged();
}
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush(true, true);
assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
equalTo(engine.getMinRetainedSeqNo()));
}
if (rarely()) {
engine.forceMerge(randomBoolean());
}
try (Closeable ignored = engine.acquireRetentionLockForPeerRecovery()) {
long minRetainSeqNos = engine.getMinRetainedSeqNo();
assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);
Set<Long> actualOps = readAllOperationsInLucene(engine, createMapperService("test")).stream()
.map(Translog.Operation::seqNo).collect(Collectors.toSet());
assertThat(actualOps, containsInAnyOrder(expectedOps));
}
try (Engine.IndexCommitRef commitRef = engine.acquireSafeIndexCommit()) {
IndexCommit safeCommit = commitRef.getIndexCommit();
if (safeCommit.getUserData().containsKey(Engine.MIN_RETAINED_SEQNO)) {
lastMinRetainedSeqNo = Long.parseLong(safeCommit.getUserData().get(Engine.MIN_RETAINED_SEQNO));
}
}
}
if (randomBoolean()) {
engine.close();
} else {
engine.flushAndClose();
}
trimUnsafeCommits(engine.config());
try (InternalEngine recoveringEngine = new InternalEngine(engine.config())) {
assertThat(recoveringEngine.getMinRetainedSeqNo(), equalTo(lastMinRetainedSeqNo));
}
}
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();

View File

@ -178,7 +178,7 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
}
assertThat(snapshot.overriddenOperations(), equalTo(totalOps - latestOperations.size()));
assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size()));
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
public class SoftDeletesPolicyTests extends ESTestCase {
/**
* Makes sure we won't advance the retained seq# if the retention lock is held
*/
public void testSoftDeletesRetentionLock() {
long retainedOps = between(0, 10000);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
long safeCommitCheckpoint = globalCheckpoint.get();
SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps);
long minRetainedSeqNo = policy.getMinRetainedSeqNo();
List<Releasable> locks = new ArrayList<>();
int iters = scaledRandomIntBetween(10, 1000);
for (int i = 0; i < iters; i++) {
if (randomBoolean()) {
locks.add(policy.acquireRetentionLock());
}
// Advances the global checkpoint and the local checkpoint of a safe commit
globalCheckpoint.addAndGet(between(0, 1000));
safeCommitCheckpoint = randomLongBetween(safeCommitCheckpoint, globalCheckpoint.get());
policy.setLocalCheckpointOfSafeCommit(safeCommitCheckpoint);
if (rarely()) {
retainedOps = between(0, 10000);
policy.setRetentionOperations(retainedOps);
}
// Release some locks
List<Releasable> releasingLocks = randomSubsetOf(locks);
locks.removeAll(releasingLocks);
releasingLocks.forEach(Releasable::close);
// We only expose the seqno to the merge policy if the retention lock is not held.
policy.getRetentionQuery();
if (locks.isEmpty()) {
long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1;
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
}
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}
locks.forEach(Releasable::close);
long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1;
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}
}

View File

@ -38,10 +38,12 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
@ -136,7 +138,9 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
}
public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
//TODO: Enables this test with soft-deletes once we have timestamp
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
try (ReplicationGroup shards = createGroup(0, settings)) {
shards.startAll();
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
indexRequest.onRetry(); // force an update of the timestamp
@ -239,18 +243,32 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
* for primary and replica shards
*/
public void testDocumentFailureReplication() throws Exception {
final String failureMessage = "simulated document failure";
final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory =
new ThrowingDocumentFailureEngineFactory(failureMessage);
String failureMessage = "simulated document failure";
final EngineFactory failIndexingOpsEngine = new EngineFactory() {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return EngineTestCase.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
boolean isTombstone = false;
for (IndexableField field : doc) {
if (SeqNoFieldMapper.TOMBSTONE_NAME.equals(field.name())) {
isTombstone = true;
}
}
if (isTombstone) {
return super.addDocument(doc);
} else {
throw new IOException(failureMessage);
}
}
}, null, null, config);
}
};
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
if (routing.primary()){
return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary.
}else {
return InternalEngine::new;
}
}}) {
protected EngineFactory getEngineFactory(ShardRouting routing) { return failIndexingOpsEngine; }}) {
// test only primary
shards.startPrimary();
@ -370,8 +388,9 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
recoverReplica(replica3, replica2);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
@ -447,27 +466,6 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
}
}
/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;
ThrowingDocumentFailureEngineFactory(String documentFailureMessage) {
this.documentFailureMessage = documentFailureMessage;
}
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
assert documentFailureMessage != null;
throw new IOException(documentFailureMessage);
}
}, null, null, config);
}
}
private static void assertNoOpTranslogOperationForDocumentFailure(
Iterable<IndexShard> replicationGroup,
int expectedOperation,

View File

@ -98,7 +98,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
public void testRecoveryOfDisconnectedReplica() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
try (ReplicationGroup shards = createGroup(1, settings)) {
shards.startAll();
int docs = shards.indexDocs(randomInt(50));
shards.flush();
@ -219,7 +220,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
public void testRecoveryAfterPrimaryPromotion() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
try (ReplicationGroup shards = createGroup(2, settings)) {
shards.startAll();
int totalDocs = shards.indexDocs(randomInt(10));
int committedDocs = 0;
@ -231,6 +233,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard replica = shards.getReplicas().get(1);
boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled();
if (randomBoolean()) {
// simulate docs that were inflight when primary failed, these will be rolled back
final int rollbackDocs = randomIntBetween(1, 5);
@ -267,6 +270,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)
);
newPrimary.indexSettings().updateIndexMetaData(builder.build());
newPrimary.onSettingsChanged();
@ -276,9 +280,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
shards.syncGlobalCheckpoint();
assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo()));
});
newPrimary.flush(new FlushRequest());
newPrimary.flush(new FlushRequest().force(true));
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
// we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen
if (softDeleteEnabled) {
newPrimary.flush(new FlushRequest().force(true));
}
}
if (randomBoolean()) {
@ -298,7 +306,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
} else {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary;
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps));
}
// roll back the extra ops in the replica

View File

@ -2932,7 +2932,13 @@ public class IndexShardTests extends IndexShardTestCase {
// Deleting a doc causes its memory to be freed from the breaker
deleteDoc(primary, "_doc", "0");
primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it.
// Here we are testing that a fully deleted segment should be dropped and its memory usage is freed.
// In order to instruct the merge policy not to keep a fully deleted segment,
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
primary.sync();
flushShard(primary);
}
primary.refresh("force refresh");
ss = primary.segmentStats(randomBoolean());

View File

@ -411,12 +411,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
recoverySettings.getChunkSize().bytesAsInt(),
Settings.EMPTY) {
@Override
boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
return randomBoolean();
}
@Override
public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
phase1Called.set(true);

View File

@ -69,7 +69,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.addReplica();
shards.startAll();
final IndexShard replica = shards.getReplicas().get(0);
assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(docs));
assertThat(getTranslog(replica).totalOperations(), equalTo(docs));
}
}
@ -99,7 +99,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
releaseRecovery.countDown();
future.get();
// rolling/flushing is async
assertBusy(() -> assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0)));
assertBusy(() -> assertThat(getTranslog(replica).totalOperations(), equalTo(0)));
}
}
@ -113,9 +113,10 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
* - index #2
* - index #5
* - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
* - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
*/
try (ReplicationGroup shards = createGroup(1)) {
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10).build();
try (ReplicationGroup shards = createGroup(1, settings)) {
shards.startAll();
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
@ -123,7 +124,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
// delete #1
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL);
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0
orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
@ -143,16 +144,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
final int translogOps;
if (randomBoolean()) {
if (randomBoolean()) {
logger.info("--> flushing shard (translog will be trimmed)");
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1"));
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
orgReplica.indexSettings().updateIndexMetaData(builder.build());
orgReplica.onSettingsChanged();
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
} else {
logger.info("--> flushing shard (translog will be retained)");
logger.info("--> flushing shard (translog/soft-deletes will be retained)");
translogOps = 6; // 5 ops + seqno gaps
}
flushShard(orgReplica);
@ -167,7 +169,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.recoverReplica(newReplica);
shards.assertAllEqual(3);
assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(translogOps));
assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps));
}
}
@ -219,7 +221,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.recoverReplica(newReplica);
// file based recovery should be made
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs));
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
// history uuid was restored
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
@ -323,7 +325,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs));
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}

View File

@ -1047,8 +1047,12 @@ public class IndexStatsIT extends ESIntegTestCase {
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult());
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult());
// Here we are testing that a fully deleted segment should be dropped and its cached is evicted.
// In order to instruct the merge policy not to keep a fully deleted segment,
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
persistGlobalCheckpoint("index");
flush("index");
}
refresh();
response = client().admin().indices().prepareStats("index").setQueryCache(true).get();

View File

@ -839,10 +839,15 @@ public abstract class EngineTestCase extends ESTestCase {
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
long maxSeqNo = Math.max(0, ((InternalEngine)engine).getLocalCheckpointTracker().getMaxSeqNo());
final long seqNoForRecovery;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
}
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
for (Translog.Operation translogOp : translogOps.values()) {
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
if (luceneOp == null) {
if (globalCheckpoint + 1 - retainedOps <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) {
if (minSeqNoToRetain <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) {
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
} else {

View File

@ -100,7 +100,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
protected ReplicationGroup createGroup(int replicas) throws IOException {
IndexMetaData metaData = buildIndexMetaData(replicas);
return createGroup(replicas, Settings.EMPTY);
}
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
IndexMetaData metaData = buildIndexMetaData(replicas, settings, indexMapping);
return new ReplicationGroup(metaData);
}
@ -109,9 +113,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected IndexMetaData buildIndexMetaData(int replicas, Map<String, String> mappings) throws IOException {
return buildIndexMetaData(replicas, Settings.EMPTY, mappings);
}
protected IndexMetaData buildIndexMetaData(int replicas, Settings indexSettings, Map<String, String> mappings) throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(indexSettings)
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName())
.settings(settings)
@ -203,6 +212,18 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return listener.get();
}
public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception {
PlainActionFuture<BulkItemResponse> listener = new PlainActionFuture<>();
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
listener::onFailure);
BulkItemRequest[] items = new BulkItemRequest[1];
items[0] = new BulkItemRequest(0, deleteRequest);
BulkShardRequest request = new BulkShardRequest(shardId, deleteRequest.getRefreshPolicy(), items);
new IndexingAction(request, wrapBulkListener, this).execute();
return listener.get();
}
public synchronized void startAll() throws IOException {
startReplicas(replicas.size());
}

View File

@ -609,11 +609,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {
final Engine.DeleteResult result;
if (shard.routingEntry().primary()) {
return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL);
result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL);
shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint());
} else {
return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL);
result = shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL);
}
return result;
}
protected void flushShard(IndexShard shard) {