Only retain reasonable history for peer recoveries (#45208) (#45355)

Today if a shard is not fully allocated we maintain a retention lease for a
lost peer for up to 12 hours, retaining all operations that occur in that time
period so that we can recover this replica using an operations-based recovery
if it returns. However it is not always reasonable to perform an
operations-based recovery on such a replica: if the replica is a very long way
behind the rest of the replication group then it can be much quicker to perform
a file-based recovery instead.

This commit introduces a notion of "reasonable" recoveries. If an
operations-based recovery would involve copying only a small number of
operations, but the index is large, then an operations-based recovery is
reasonable; on the other hand if there are many operations to copy across and
the index itself is relatively small then it makes more sense to perform a
file-based recovery. We measure the size of the index by computing its number
of documents (including deleted documents) in all segments belonging to the
current safe commit, and compare this to the number of operations a lease is
retaining below the local checkpoint of the safe commit. We consider an
operations-based recovery to be reasonable iff it would involve replaying at
most 10% of the documents in the index.

The mechanism for this feature is to expire peer-recovery retention leases
early if they are retaining so much history that an operations-based recovery
using that lease would be unreasonable.

Relates #41536
This commit is contained in:
Armin Braun 2019-08-09 01:56:32 +02:00 committed by GitHub
parent 7b0a8040de
commit 12ed6dc999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 363 additions and 41 deletions

View File

@ -301,6 +301,19 @@ public final class IndexSettings {
public static final Setting<Boolean> INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false,
Property.IndexScope, Property.PrivateIndex, Property.Dynamic);
/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
* documents) on the grounds that a file-based peer recovery may copy all of the documents in the shard over to the new peer, but is
* significantly faster than replaying the missing operations on the peer, so once a peer falls far enough behind the primary it makes
* more sense to copy all the data over again instead of replaying history.
*
* Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is
* intentionally unregistered.
*/
public static final Setting<Double> FILE_BASED_RECOVERY_THRESHOLD_SETTING
= Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope);
private final Index index;
private final Version version;
private final Logger logger;

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
@ -43,7 +44,7 @@ import java.util.function.LongSupplier;
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
@ -51,6 +52,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY;
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) {
@ -62,7 +64,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
}
@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
public void onInit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false : "index is opened, but we have no commits";
onCommit(commits);
if (safeCommit != commits.get(commits.size() - 1)) {
@ -74,16 +76,32 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
}
@Override
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final IndexCommit safeCommit;
synchronized (this) {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
this.safeCommitInfo = SafeCommitInfo.EMPTY;
this.lastCommit = commits.get(commits.size() - 1);
this.safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
}
}
updateRetentionPolicy();
safeCommit = this.safeCommit;
}
updateRetentionPolicy();
assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase";
safeCommitInfo = new SafeCommitInfo(Long.parseLong(
safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), getDocCountOfCommit(safeCommit));
// This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases
// to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a
// bit more history and do a few more ops-based recoveries than we would otherwise.
final IndexCommit newSafeCommit = this.safeCommit;
assert safeCommit == newSafeCommit
: "onCommit called concurrently? " + safeCommit.getGeneration() + " vs " + newSafeCommit.getGeneration();
}
private void deleteCommit(IndexCommit commit) throws IOException {
@ -109,6 +127,14 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}
protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc();
}
SafeCommitInfo getSafeCommitInfo() {
return safeCommitInfo;
}
/**
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
* Index files of the capturing commit point won't be released until the commit reference is closed.

View File

@ -1122,6 +1122,11 @@ public abstract class Engine implements Closeable {
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;
/**
* @return a summary of the contents of the current safe commit
*/
public abstract SafeCommitInfo getSafeCommitInfo();
/**
* If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure
* that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled

View File

@ -2008,6 +2008,11 @@ public class InternalEngine extends Engine {
}
}
@Override
public SafeCommitInfo getSafeCommitInfo() {
return combinedDeletionPolicy.getSafeCommitInfo();
}
private boolean failOnTragicEvent(AlreadyClosedException ex) {
final boolean engineFailed;
// if we are already closed due to some tragic exception

View File

@ -77,6 +77,7 @@ public class ReadOnlyEngine extends Engine {
private final Lock indexWriterLock;
private final DocsStats docsStats;
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;
protected volatile TranslogStats translogStats;
@ -120,6 +121,7 @@ public class ReadOnlyEngine extends Engine {
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
success = true;
} finally {
if (success == false) {
@ -420,6 +422,11 @@ public class ReadOnlyEngine extends Engine {
return acquireLastIndexCommit(false);
}
@Override
public SafeCommitInfo getSafeCommitInfo() {
return safeCommitInfo;
}
@Override
public void activateThrottling() {
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
/**
* Information about the safe commit, for making decisions about recoveries.
*/
public class SafeCommitInfo {
public final long localCheckpoint;
public final int docCount;
public SafeCommitInfo(long localCheckpoint, int docCount) {
this.localCheckpoint = localCheckpoint;
this.docCount = docCount;
}
public static final SafeCommitInfo EMPTY = new SafeCommitInfo(SequenceNumbers.NO_OPS_PERFORMED, 0);
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
@ -57,6 +58,7 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -210,6 +212,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private boolean hasAllPeerRecoveryRetentionLeases;
/**
* Supplies information about the current safe commit which may be used to expire peer-recovery retention leases.
*/
private final Supplier<SafeCommitInfo> safeCommitInfoSupplier;
/**
* Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See
* {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}.
*/
private final double fileBasedRecoveryThreshold;
/**
* Get all retention leases tracked on this shard.
*
@ -237,6 +250,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Set<String> leaseIdsForCurrentPeers
= routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet());
final boolean allShardsStarted = routingTable.allShardsStarted();
final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo();
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
.leases()
.stream()
@ -245,7 +260,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
if (leaseIdsForCurrentPeers.contains(lease.id())) {
return false;
}
if (routingTable.allShardsStarted()) {
if (allShardsStarted) {
logger.trace("expiring unused [{}]", lease);
return true;
}
if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) {
logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo);
return true;
}
}
@ -264,6 +284,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
return Tuple.tuple(true, retentionLeases);
}
private long getMinimumReasonableRetainedSeqNo() {
final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get();
return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold));
// NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested
// docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document
// has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and
// therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to
// do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless.
// TODO improve this measure for when nested docs are in use
}
/**
* Adds a new retention lease.
*
@ -850,7 +881,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
final Supplier<SafeCommitInfo> safeCommitInfoSupplier) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@ -867,6 +899,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}

View File

@ -92,6 +92,7 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
@ -336,7 +337,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener));
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener),
this::getSafeCommitInfo);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
@ -2612,6 +2614,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}
private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();
}
class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

View File

@ -55,7 +55,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final SoftDeletesPolicy softDeletesPolicy =
new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
@ -102,7 +102,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY);
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
long lastTranslogGen = between(1, 20);
@ -182,7 +182,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
@ -217,7 +217,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
@ -254,6 +254,17 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}
}
private CombinedDeletionPolicy newCombinedDeletionPolicy(TranslogDeletionPolicy translogPolicy, SoftDeletesPolicy softDeletesPolicy,
AtomicLong globalCheckpoint) {
return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get)
{
@Override
protected int getDocCountOfCommit(IndexCommit indexCommit) {
return between(0, 1000);
}
};
}
IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@ -48,6 +50,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
private ReplicationTracker replicationTracker;
private AtomicLong currentTimeMillis;
private Settings settings;
private SafeCommitInfo safeCommitInfo;
@Before
public void setUpReplicationTracker() throws InterruptedException {
@ -63,6 +66,8 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
settings = Settings.EMPTY;
}
safeCommitInfo = null; // must be set in each test
final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
@ -72,7 +77,8 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> { },
currentTimeMillis::get,
(leases, listener) -> { });
(leases, listener) -> { },
() -> safeCommitInfo);
replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()),
routingTable(Collections.emptySet(), primaryAllocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
@ -109,6 +115,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
}
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()));
safeCommitInfo = randomSafeCommitInfo();
final Tuple<Boolean, RetentionLeases> retentionLeases = replicationTracker.getRetentionLeases(true);
assertFalse(retentionLeases.v1());
@ -121,11 +128,14 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() {
final String unknownNodeId = randomAlphaOfLength(10);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER);
final long globalCheckpoint = randomNonNegativeLong(); // not NO_OPS_PERFORMED since this always results in file-based recovery
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
currentTimeMillis.set(currentTimeMillis.get()
+ randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()));
safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint);
final Tuple<Boolean, RetentionLeases> retentionLeases = replicationTracker.getRetentionLeases(true);
assertFalse("should not have expired anything", retentionLeases.v1());
@ -142,12 +152,15 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
}
final String unknownNodeId = randomAlphaOfLength(10);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER);
final long globalCheckpoint = randomCheckpoint();
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
currentTimeMillis.set(randomLongBetween(
currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1,
Long.MAX_VALUE));
safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint);
final Tuple<Boolean, RetentionLeases> retentionLeases = replicationTracker.getRetentionLeases(true);
assertTrue("should have expired something", retentionLeases.v1());
@ -167,6 +180,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
(usually()
? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())
: randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())));
safeCommitInfo = randomSafeCommitInfo();
final Tuple<Boolean, RetentionLeases> retentionLeases = replicationTracker.getRetentionLeases(true);
assertTrue(retentionLeases.v1());
@ -176,4 +190,41 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes
assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet())));
}
public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingTooMuchHistory() {
if (randomBoolean()) {
startReplica();
}
final String unknownNodeId = randomAlphaOfLength(10);
final long globalCheckpoint = randomValueOtherThan(SequenceNumbers.NO_OPS_PERFORMED, this::randomCheckpoint);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
safeCommitInfo = randomSafeCommitInfoSuitableForFileBasedRecovery(globalCheckpoint);
final Tuple<Boolean, RetentionLeases> retentionLeases = replicationTracker.getRetentionLeases(true);
assertTrue("should have expired something", retentionLeases.v1());
final Set<String> leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet());
assertThat(leaseIds, hasSize(2));
assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet())));
}
private SafeCommitInfo randomSafeCommitInfo() {
return randomBoolean() ? SafeCommitInfo.EMPTY : new SafeCommitInfo(
randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, Integer.MAX_VALUE)),
randomIntBetween(0, Integer.MAX_VALUE));
}
private SafeCommitInfo randomSafeCommitInfoSuitableForOpsBasedRecovery(long globalCheckpoint) {
// simulate a safe commit that is behind the given global checkpoint, so that no files need to be transferrred
final long localCheckpoint = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint);
return new SafeCommitInfo(localCheckpoint, between(0, Math.toIntExact(Math.min(localCheckpoint + 1, Integer.MAX_VALUE))));
}
private SafeCommitInfo randomSafeCommitInfoSuitableForFileBasedRecovery(long globalCheckpoint) {
// simulate a later safe commit containing no documents, which is always better to transfer than any ops
return new SafeCommitInfo(randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE), 0);
}
}

View File

@ -70,7 +70,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -111,7 +112,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -139,7 +141,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -174,7 +177,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
.stream()
.collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
equalTo(retainingSequenceNumbers));
});
},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
@ -210,7 +214,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -266,7 +271,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
assertFalse(Thread.holdsLock(replicationTrackerRef.get()));
assertTrue(synced.compareAndSet(false, true));
listener.onResponse(new ReplicationResponse());
});
},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTrackerRef.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
@ -309,7 +315,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> { });
(leases, listener) -> { },
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -331,7 +338,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> { });
(leases, listener) -> { },
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -357,7 +365,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -392,7 +401,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
.stream()
.collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
equalTo(retainingSequenceNumbers));
});
},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
@ -445,7 +455,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -519,7 +530,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -572,7 +584,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -605,7 +618,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
@ -653,7 +667,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -32,6 +33,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import java.util.Set;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@ -49,9 +51,12 @@ public abstract class ReplicationTrackerTestCase extends ESTestCase {
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint,
currentTimeMillisSupplier,
(leases, listener) -> {});
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
}
static final Supplier<SafeCommitInfo> OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY;
static String nodeIdFromAllocationId(final AllocationId allocationId) {
return "n-" + allocationId.getId().substring(0, 8);
}

View File

@ -694,10 +694,10 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onNewRetentionLease =
(leases, listener) -> {};
ReplicationTracker oldPrimary = new ReplicationTracker(
shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
ReplicationTracker newPrimary = new ReplicationTracker(
shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
ReplicationTracker oldPrimary = new ReplicationTracker(shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint,
onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
ReplicationTracker newPrimary = new ReplicationTracker(shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint,
onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));

View File

@ -1206,6 +1206,109 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
}
public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
String indexName = "test-index";
final Settings.Builder settings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms");
final double reasonableOperationsBasedRecoveryProportion;
if (randomBoolean()) {
reasonableOperationsBasedRecoveryProportion = randomDoubleBetween(0.05, 0.99, true);
settings.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(),
reasonableOperationsBasedRecoveryProportion);
} else {
reasonableOperationsBasedRecoveryProportion
= IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(Settings.EMPTY);
}
logger.info("--> performing ops-based recoveries up to [{}%] of docs", reasonableOperationsBasedRecoveryProportion * 100.0);
createIndex(indexName, settings.build());
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
flush(indexName);
// wait for all history to be discarded
assertBusy(() -> {
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases() + " should discard history up to " + maxSeqNo,
shardStats.getRetentionLeaseStats().retentionLeases().leases().stream().allMatch(
l -> l.retainingSequenceNumber() == maxSeqNo + 1));
}
});
flush(indexName); // ensure that all operations are in the safe commit
final ShardStats shardStats = client().admin().indices().prepareStats(indexName).get().getShards()[0];
final long docCount = shardStats.getStats().docs.getCount();
assertThat(shardStats.getStats().docs.getDeleted(), equalTo(0L));
assertThat(shardStats.getSeqNoStats().getMaxSeqNo() + 1, equalTo(docCount));
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
final DiscoveryNodes discoveryNodes = clusterService().state().nodes();
final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId);
final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0);
assertTrue("should have lease for " + replicaShardRouting,
client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats()
.retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting)));
internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(),
new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertFalse(client().admin().cluster().prepareHealth()
.setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1))
.setWaitForEvents(Priority.LANGUID).get().isTimedOut());
final int newDocCount = Math.toIntExact(Math.round(Math.ceil(
(1 + Math.ceil(docCount * reasonableOperationsBasedRecoveryProportion))
/ (1 - reasonableOperationsBasedRecoveryProportion))));
/*
* newDocCount >= (ceil(docCount * p) + 1) / (1-p)
*
* ==> 0 <= newDocCount * (1-p) - ceil(docCount * p) - 1
* = newDocCount - (newDocCount * p + ceil(docCount * p) + 1)
* < newDocCount - (ceil(newDocCount * p) + ceil(docCount * p))
* <= newDocCount - ceil(newDocCount * p + docCount * p)
*
* ==> docCount < newDocCount + docCount - ceil((newDocCount + docCount) * p)
* == localCheckpoint + 1 - ceil((newDocCount + docCount) * p)
* == firstReasonableSeqNo
*
* The replica has docCount docs, i.e. has operations with seqnos [0..docCount-1], so a seqno-based recovery will start
* from docCount < firstReasonableSeqNo
*
* ==> it is unreasonable to recover the replica using a seqno-based recovery
*/
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, newDocCount)
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
flush(indexName);
assertBusy(() -> assertFalse("should no longer have lease for " + replicaShardRouting,
client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats()
.retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting))));
return super.onNodeStopped(nodeName);
}
});
ensureGreen(indexName);
//noinspection OptionalGetWithoutIsPresent because it fails the test if absent
final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get()
.shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get();
assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0));
}
public void testDoesNotCopyOperationsInSafeCommit() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);

View File

@ -674,7 +674,8 @@ public abstract class EngineTestCase extends ESTestCase {
SequenceNumbers.NO_OPS_PERFORMED,
update -> {},
() -> 0L,
(leases, listener) -> listener.onResponse(new ReplicationResponse()));
(leases, listener) -> listener.onResponse(new ReplicationResponse()),
() -> SafeCommitInfo.EMPTY);
globalCheckpointSupplier = replicationTracker;
retentionLeasesSupplier = replicationTracker::getRetentionLeases;
} else {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import java.util.Arrays;
@ -51,6 +52,7 @@ public final class InternalSettingsPlugin extends Plugin {
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
);
}