From 578514e89266bd8f7d032b4a3e62e8a2bbe863a3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 15 Feb 2019 08:43:24 +0000 Subject: [PATCH] Recover peers from translog, ignoring soft deletes (#38904) Today if soft deletes are enabled then we read the operations needed for peer recovery from Lucene. However we do not currently make any attempt to retain history in Lucene specifically for peer recoveries so we may discard it and fall back to a more expensive file-based recovery. Yet we still retain sufficient history in the translog to perform an operations-based peer recovery. In the long run we would like to fix this by retaining more history in Lucene, possibly using shard history retention leases (#37165). For now, however, this commit reverts to performing peer recoveries using the history retained in the translog regardless of whether soft deletes are enabled or not. --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 39 +++++++------- .../recovery/RecoverySourceHandler.java | 7 ++- .../indices/recovery/RecoveryState.java | 4 ++ .../shard/PrimaryReplicaSyncerTests.java | 14 ++--- .../indices/recovery/IndexRecoveryIT.java | 53 +++++++++++++++++++ .../indices/recovery/RecoveryTests.java | 9 ++-- 7 files changed, 87 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bac85413a7a..56d8c6bab61 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -767,7 +767,7 @@ public abstract class Engine implements Closeable { MapperService mapperService, long startingSeqNo) throws IOException; /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) + * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 66e0d30f164..832df83fe0f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -502,16 +502,11 @@ public class InternalEngine extends Engine { } /** - * Creates a new history snapshot for reading operations since the provided seqno. - * The returned snapshot can be retrieved from either Lucene index or translog files. + * Creates a new history snapshot for reading operations since the provided seqno from the translog. */ @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); - } else { - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); - } + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } /** @@ -2546,21 +2541,17 @@ public class InternalEngine extends Engine { @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return getMinRetainedSeqNo() <= startingSeqNo; - } else { - final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsCompleted(operation.seqNo()); - } + final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsCompleted(operation.seqNo()); } } - return tracker.getCheckpoint() >= currentLocalCheckpoint; } + return tracker.getCheckpoint() >= currentLocalCheckpoint; } /** @@ -2575,7 +2566,15 @@ public class InternalEngine extends Engine { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - return softDeletesPolicy.acquireRetentionLock(); + final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); + final Closeable translogRetentionLock; + try { + translogRetentionLock = translog.acquireRetentionLock(); + } catch (Exception e) { + softDeletesRetentionLock.close(); + throw e; + } + return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock); } else { return translog.acquireRetentionLock(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 40e1a88a349..e7a8fbfb523 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -177,10 +177,9 @@ public class RecoverySourceHandler { // We must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; - // If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have - // the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly - // according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo. - startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; + // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will + // still filter out legacy operations without seqNo. + startingSeqNo = 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 1fed238f8dd..4d27362af22 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -417,6 +417,10 @@ public class RecoveryState implements ToXContentFragment, Streamable, Writeable stopTime = 0; } + // for tests + public long getStartNanoTime() { + return startNanoTime; + } } public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d074ef33758..85e381b176c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -115,16 +115,10 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { - if (shard.indexSettings.isSoftDeleteEnabled()) { - assertThat(resyncTask.getSkippedOperations(), equalTo(0)); - assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); - assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); - } else { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); - } + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); assertThat(resyncTask.getResyncedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ea15eceb8be..ea3e933a883 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -26,11 +26,13 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -786,4 +788,55 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertHitCount(client().prepareSearch(indexName).get(), numDocs); } } + + @TestLogging("org.elasticsearch.indices.recovery:TRACE") + public void testHistoryRetention() throws Exception { + internalCluster().startNodes(3); + + final String indexName = "test"; + client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get(); + ensureGreen(indexName); + + // Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case + final List requests = new ArrayList<>(); + final int replicatedDocCount = scaledRandomIntBetween(25, 250); + while (requests.size() < replicatedDocCount) { + requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON)); + } + indexRandom(true, requests); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().stopRandomNode(s -> true); + internalCluster().stopRandomNode(s -> true); + + final long desyncNanoTime = System.nanoTime(); + while (System.nanoTime() <= desyncNanoTime) { + // time passes + } + + final int numNewDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numNewDocs; i++) { + client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get(); + } + // Flush twice to update the safe commit's local checkpoint + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startNode(); + ensureGreen(indexName); + + final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName); + recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime); + + assertThat(recoveryStates, hasSize(1)); + assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0)); + assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 48061b11d58..2761333ef56 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -68,8 +68,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs)); shards.assertAllEqual(docs + moreDocs); } } @@ -282,8 +281,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -387,8 +385,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } }