From 12ed6dc9996b363ecf5a150872dbad1fe6d895d9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 9 Aug 2019 01:56:32 +0200 Subject: [PATCH] Only retain reasonable history for peer recoveries (#45208) (#45355) Today if a shard is not fully allocated we maintain a retention lease for a lost peer for up to 12 hours, retaining all operations that occur in that time period so that we can recover this replica using an operations-based recovery if it returns. However it is not always reasonable to perform an operations-based recovery on such a replica: if the replica is a very long way behind the rest of the replication group then it can be much quicker to perform a file-based recovery instead. This commit introduces a notion of "reasonable" recoveries. If an operations-based recovery would involve copying only a small number of operations, but the index is large, then an operations-based recovery is reasonable; on the other hand if there are many operations to copy across and the index itself is relatively small then it makes more sense to perform a file-based recovery. We measure the size of the index by computing its number of documents (including deleted documents) in all segments belonging to the current safe commit, and compare this to the number of operations a lease is retaining below the local checkpoint of the safe commit. We consider an operations-based recovery to be reasonable iff it would involve replaying at most 10% of the documents in the index. The mechanism for this feature is to expire peer-recovery retention leases early if they are retaining so much history that an operations-based recovery using that lease would be unreasonable. Relates #41536 --- .../elasticsearch/index/IndexSettings.java | 13 +++ .../index/engine/CombinedDeletionPolicy.java | 46 ++++++-- .../elasticsearch/index/engine/Engine.java | 5 + .../index/engine/InternalEngine.java | 5 + .../index/engine/ReadOnlyEngine.java | 7 ++ .../index/engine/SafeCommitInfo.java | 37 +++++++ .../index/seqno/ReplicationTracker.java | 38 ++++++- .../elasticsearch/index/shard/IndexShard.java | 9 +- .../engine/CombinedDeletionPolicyTests.java | 19 +++- ...PeerRecoveryRetentionLeaseExpiryTests.java | 57 +++++++++- ...ReplicationTrackerRetentionLeaseTests.java | 45 +++++--- .../seqno/ReplicationTrackerTestCase.java | 7 +- .../index/seqno/ReplicationTrackerTests.java | 8 +- .../indices/recovery/IndexRecoveryIT.java | 103 ++++++++++++++++++ .../index/engine/EngineTestCase.java | 3 +- .../test/InternalSettingsPlugin.java | 2 + 16 files changed, 363 insertions(+), 41 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index d4cc38f0b95..ca8a24ea93d 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -301,6 +301,19 @@ public final class IndexSettings { public static final Setting INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false, Property.IndexScope, Property.PrivateIndex, Property.Dynamic); + /** + * Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an + * operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted + * documents) on the grounds that a file-based peer recovery may copy all of the documents in the shard over to the new peer, but is + * significantly faster than replaying the missing operations on the peer, so once a peer falls far enough behind the primary it makes + * more sense to copy all the data over again instead of replaying history. + * + * Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is + * intentionally unregistered. + */ + public static final Setting FILE_BASED_RECOVERY_THRESHOLD_SETTING + = Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope); + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 313598e1d8e..8166a0d37d4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -43,7 +44,7 @@ import java.util.function.LongSupplier; * In particular, this policy will delete index commits whose max sequence number is at most * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -public final class CombinedDeletionPolicy extends IndexDeletionPolicy { +public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; @@ -51,6 +52,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point + private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) { @@ -62,7 +64,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { } @Override - public synchronized void onInit(List commits) throws IOException { + public void onInit(List commits) throws IOException { assert commits.isEmpty() == false : "index is opened, but we have no commits"; onCommit(commits); if (safeCommit != commits.get(commits.size() - 1)) { @@ -74,16 +76,32 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { } @Override - public synchronized void onCommit(List commits) throws IOException { - final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); - lastCommit = commits.get(commits.size() - 1); - safeCommit = commits.get(keptPosition); - for (int i = 0; i < keptPosition; i++) { - if (snapshottedCommits.containsKey(commits.get(i)) == false) { - deleteCommit(commits.get(i)); + public void onCommit(List commits) throws IOException { + final IndexCommit safeCommit; + synchronized (this) { + final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); + this.safeCommitInfo = SafeCommitInfo.EMPTY; + this.lastCommit = commits.get(commits.size() - 1); + this.safeCommit = commits.get(keptPosition); + for (int i = 0; i < keptPosition; i++) { + if (snapshottedCommits.containsKey(commits.get(i)) == false) { + deleteCommit(commits.get(i)); + } } + updateRetentionPolicy(); + safeCommit = this.safeCommit; } - updateRetentionPolicy(); + + assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase"; + safeCommitInfo = new SafeCommitInfo(Long.parseLong( + safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), getDocCountOfCommit(safeCommit)); + + // This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases + // to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a + // bit more history and do a few more ops-based recoveries than we would otherwise. + final IndexCommit newSafeCommit = this.safeCommit; + assert safeCommit == newSafeCommit + : "onCommit called concurrently? " + safeCommit.getGeneration() + " vs " + newSafeCommit.getGeneration(); } private void deleteCommit(IndexCommit commit) throws IOException { @@ -109,6 +127,14 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); } + protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { + return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc(); + } + + SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + /** * Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}. * Index files of the capturing commit point won't be released until the commit reference is closed. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7a50d3471a3..f26e5b8ad1f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1122,6 +1122,11 @@ public abstract class Engine implements Closeable { */ public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; + /** + * @return a summary of the contents of the current safe commit + */ + public abstract SafeCommitInfo getSafeCommitInfo(); + /** * If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure * that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5a8662845c4..b83c0a70178 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2008,6 +2008,11 @@ public class InternalEngine extends Engine { } } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return combinedDeletionPolicy.getSafeCommitInfo(); + } + private boolean failOnTragicEvent(AlreadyClosedException ex) { final boolean engineFailed; // if we are already closed due to some tragic exception diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 30b3d0221f3..ded39c51b37 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -77,6 +77,7 @@ public class ReadOnlyEngine extends Engine { private final Lock indexWriterLock; private final DocsStats docsStats; private final RamAccountingRefreshListener refreshListener; + private final SafeCommitInfo safeCommitInfo; protected volatile TranslogStats translogStats; @@ -120,6 +121,7 @@ public class ReadOnlyEngine extends Engine { assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time"; this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; + this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); success = true; } finally { if (success == false) { @@ -420,6 +422,11 @@ public class ReadOnlyEngine extends Engine { return acquireLastIndexCommit(false); } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + @Override public void activateThrottling() { } diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java new file mode 100644 index 00000000000..37461177c93 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.seqno.SequenceNumbers; + +/** + * Information about the safe commit, for making decisions about recoveries. + */ +public class SafeCommitInfo { + + public final long localCheckpoint; + public final int docCount; + + public SafeCommitInfo(long localCheckpoint, int docCount) { + this.localCheckpoint = localCheckpoint; + this.docCount = docCount; + } + + public static final SafeCommitInfo EMPTY = new SafeCommitInfo(SequenceNumbers.NO_OPS_PERFORMED, 0); +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 465abdd0e27..1ef7c27c517 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; @@ -57,6 +58,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -210,6 +212,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private boolean hasAllPeerRecoveryRetentionLeases; + /** + * Supplies information about the current safe commit which may be used to expire peer-recovery retention leases. + */ + private final Supplier safeCommitInfoSupplier; + + /** + * Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See + * {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}. + */ + private final double fileBasedRecoveryThreshold; + /** * Get all retention leases tracked on this shard. * @@ -237,6 +250,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set leaseIdsForCurrentPeers = routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()); + final boolean allShardsStarted = routingTable.allShardsStarted(); + final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo(); final Map> partitionByExpiration = retentionLeases .leases() .stream() @@ -245,7 +260,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L if (leaseIdsForCurrentPeers.contains(lease.id())) { return false; } - if (routingTable.allShardsStarted()) { + if (allShardsStarted) { + logger.trace("expiring unused [{}]", lease); + return true; + } + if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) { + logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo); return true; } } @@ -264,6 +284,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L return Tuple.tuple(true, retentionLeases); } + private long getMinimumReasonableRetainedSeqNo() { + final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get(); + return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold)); + // NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested + // docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document + // has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and + // therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to + // do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless. + // TODO improve this measure for when nested docs are in use + } + /** * Adds a new retention lease. * @@ -850,7 +881,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onSyncRetentionLeases) { + final BiConsumer> onSyncRetentionLeases, + final Supplier safeCommitInfoSupplier) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -867,6 +899,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L this.routingTable = null; this.replicationGroup = null; this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); + this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dafd379b192..225056e2edc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -92,6 +92,7 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -336,7 +337,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener)); + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -2612,6 +2614,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener); } + private SafeCommitInfo getSafeCommitInfo() { + final Engine engine = getEngineOrNull(); + return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo(); + } + class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 110a27ff551..4e82a77ce43 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -55,7 +55,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -102,7 +102,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); long lastTranslogGen = between(1, 20); @@ -182,7 +182,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -217,7 +217,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); @@ -254,6 +254,17 @@ public class CombinedDeletionPolicyTests extends ESTestCase { } } + private CombinedDeletionPolicy newCombinedDeletionPolicy(TranslogDeletionPolicy translogPolicy, SoftDeletesPolicy softDeletesPolicy, + AtomicLong globalCheckpoint) { + return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get) + { + @Override + protected int getDocCountOfCommit(IndexCommit indexCommit) { + return between(0, 1000); + } + }; + } + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java index 22d4f5e86f9..fe2d8f27aa3 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -48,6 +50,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes private ReplicationTracker replicationTracker; private AtomicLong currentTimeMillis; private Settings settings; + private SafeCommitInfo safeCommitInfo; @Before public void setUpReplicationTracker() throws InterruptedException { @@ -63,6 +66,8 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes settings = Settings.EMPTY; } + safeCommitInfo = null; // must be set in each test + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), @@ -72,7 +77,8 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> { }, currentTimeMillis::get, - (leases, listener) -> { }); + (leases, listener) -> { }, + () -> safeCommitInfo); replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()), routingTable(Collections.emptySet(), primaryAllocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); @@ -109,6 +115,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes } currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse(retentionLeases.v1()); @@ -121,11 +128,14 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() { final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomNonNegativeLong(); // not NO_OPS_PERFORMED since this always results in file-based recovery + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse("should not have expired anything", retentionLeases.v1()); @@ -142,12 +152,15 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes } final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomCheckpoint(); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(randomLongBetween( currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1, Long.MAX_VALUE)); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue("should have expired something", retentionLeases.v1()); @@ -167,6 +180,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes (usually() ? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()) : randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()))); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue(retentionLeases.v1()); @@ -176,4 +190,41 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingTooMuchHistory() { + if (randomBoolean()) { + startReplica(); + } + + final String unknownNodeId = randomAlphaOfLength(10); + final long globalCheckpoint = randomValueOtherThan(SequenceNumbers.NO_OPS_PERFORMED, this::randomCheckpoint); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); + + safeCommitInfo = randomSafeCommitInfoSuitableForFileBasedRecovery(globalCheckpoint); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue("should have expired something", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + private SafeCommitInfo randomSafeCommitInfo() { + return randomBoolean() ? SafeCommitInfo.EMPTY : new SafeCommitInfo( + randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, Integer.MAX_VALUE)), + randomIntBetween(0, Integer.MAX_VALUE)); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForOpsBasedRecovery(long globalCheckpoint) { + // simulate a safe commit that is behind the given global checkpoint, so that no files need to be transferrred + final long localCheckpoint = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + return new SafeCommitInfo(localCheckpoint, between(0, Math.toIntExact(Math.min(localCheckpoint + 1, Integer.MAX_VALUE)))); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForFileBasedRecovery(long globalCheckpoint) { + // simulate a later safe commit containing no documents, which is always better to transfer than any ops + return new SafeCommitInfo(randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE), 0); + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 7611fad5a7e..bdf7acf478b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -70,7 +70,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -111,7 +112,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -139,7 +141,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -174,7 +177,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -210,7 +214,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -266,7 +271,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertFalse(Thread.holdsLock(replicationTrackerRef.get())); assertTrue(synced.compareAndSet(false, true)); listener.onResponse(new ReplicationResponse()); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTrackerRef.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -309,7 +315,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -331,7 +338,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -357,7 +365,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -392,7 +401,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -445,7 +455,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -519,7 +530,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -572,7 +584,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -605,7 +618,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -653,7 +667,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 5f035a3604f..cc32d5198c8 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -32,6 +33,7 @@ import org.elasticsearch.test.IndexSettingsModule; import java.util.Set; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -49,9 +51,12 @@ public abstract class ReplicationTrackerTestCase extends ESTestCase { UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); } + static final Supplier OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY; + static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index afbd560758c..e7d68baf265 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -694,10 +694,10 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase { final long globalCheckpoint = UNASSIGNED_SEQ_NO; final BiConsumer> onNewRetentionLease = (leases, listener) -> {}; - ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); - ReplicationTracker newPrimary = new ReplicationTracker( - shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + ReplicationTracker oldPrimary = new ReplicationTracker(shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); + ReplicationTracker newPrimary = new ReplicationTracker(shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ad791702ffe..77d47d6d241 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -1206,6 +1206,109 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); } + public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + final Settings.Builder settings = Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + + final double reasonableOperationsBasedRecoveryProportion; + if (randomBoolean()) { + reasonableOperationsBasedRecoveryProportion = randomDoubleBetween(0.05, 0.99, true); + settings.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), + reasonableOperationsBasedRecoveryProportion); + } else { + reasonableOperationsBasedRecoveryProportion + = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(Settings.EMPTY); + } + logger.info("--> performing ops-based recoveries up to [{}%] of docs", reasonableOperationsBasedRecoveryProportion * 100.0); + + createIndex(indexName, settings.build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + flush(indexName); + // wait for all history to be discarded + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases() + " should discard history up to " + maxSeqNo, + shardStats.getRetentionLeaseStats().retentionLeases().leases().stream().allMatch( + l -> l.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); + flush(indexName); // ensure that all operations are in the safe commit + + final ShardStats shardStats = client().admin().indices().prepareStats(indexName).get().getShards()[0]; + final long docCount = shardStats.getStats().docs.getCount(); + assertThat(shardStats.getStats().docs.getDeleted(), equalTo(0L)); + assertThat(shardStats.getSeqNoStats().getMaxSeqNo() + 1, equalTo(docCount)); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + assertTrue("should have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting))); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + final int newDocCount = Math.toIntExact(Math.round(Math.ceil( + (1 + Math.ceil(docCount * reasonableOperationsBasedRecoveryProportion)) + / (1 - reasonableOperationsBasedRecoveryProportion)))); + + /* + * newDocCount >= (ceil(docCount * p) + 1) / (1-p) + * + * ==> 0 <= newDocCount * (1-p) - ceil(docCount * p) - 1 + * = newDocCount - (newDocCount * p + ceil(docCount * p) + 1) + * < newDocCount - (ceil(newDocCount * p) + ceil(docCount * p)) + * <= newDocCount - ceil(newDocCount * p + docCount * p) + * + * ==> docCount < newDocCount + docCount - ceil((newDocCount + docCount) * p) + * == localCheckpoint + 1 - ceil((newDocCount + docCount) * p) + * == firstReasonableSeqNo + * + * The replica has docCount docs, i.e. has operations with seqnos [0..docCount-1], so a seqno-based recovery will start + * from docCount < firstReasonableSeqNo + * + * ==> it is unreasonable to recover the replica using a seqno-based recovery + */ + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, newDocCount) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + flush(indexName); + + assertBusy(() -> assertFalse("should no longer have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting)))); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + public void testDoesNotCopyOperationsInSafeCommit() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2c54189e20c..9e1cc211008 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -674,7 +674,8 @@ public abstract class EngineTestCase extends ESTestCase { SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L, - (leases, listener) -> listener.onResponse(new ReplicationResponse())); + (leases, listener) -> listener.onResponse(new ReplicationResponse()), + () -> SafeCommitInfo.EMPTY); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index fdb623d1d1e..246dac18ef8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; @@ -51,6 +52,7 @@ public final class InternalSettingsPlugin extends Plugin { TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING, + IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING ); }