Recover peers from translog, ignoring soft deletes (#38904)
Today if soft deletes are enabled then we read the operations needed for peer recovery from Lucene. However we do not currently make any attempt to retain history in Lucene specifically for peer recoveries so we may discard it and fall back to a more expensive file-based recovery. Yet we still retain sufficient history in the translog to perform an operations-based peer recovery. In the long run we would like to fix this by retaining more history in Lucene, possibly using shard history retention leases (#37165). For now, however, this commit reverts to performing peer recoveries using the history retained in the translog regardless of whether soft deletes are enabled or not.
This commit is contained in:
parent
a211e51343
commit
578514e892
|
@ -767,7 +767,7 @@ public abstract class Engine implements Closeable {
|
|||
MapperService mapperService, long startingSeqNo) throws IOException;
|
||||
|
||||
/**
|
||||
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
|
||||
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
|
||||
*/
|
||||
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
|
||||
|
||||
|
|
|
@ -502,16 +502,11 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a new history snapshot for reading operations since the provided seqno.
|
||||
* The returned snapshot can be retrieved from either Lucene index or translog files.
|
||||
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
|
||||
*/
|
||||
@Override
|
||||
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
|
||||
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
|
||||
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
|
||||
} else {
|
||||
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
|
||||
}
|
||||
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2546,21 +2541,17 @@ public class InternalEngine extends Engine {
|
|||
|
||||
@Override
|
||||
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
|
||||
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
|
||||
return getMinRetainedSeqNo() <= startingSeqNo;
|
||||
} else {
|
||||
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
|
||||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
tracker.markSeqNoAsCompleted(operation.seqNo());
|
||||
}
|
||||
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
|
||||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
tracker.markSeqNoAsCompleted(operation.seqNo());
|
||||
}
|
||||
}
|
||||
return tracker.getCheckpoint() >= currentLocalCheckpoint;
|
||||
}
|
||||
return tracker.getCheckpoint() >= currentLocalCheckpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2575,7 +2566,15 @@ public class InternalEngine extends Engine {
|
|||
@Override
|
||||
public Closeable acquireRetentionLock() {
|
||||
if (softDeleteEnabled) {
|
||||
return softDeletesPolicy.acquireRetentionLock();
|
||||
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
|
||||
final Closeable translogRetentionLock;
|
||||
try {
|
||||
translogRetentionLock = translog.acquireRetentionLock();
|
||||
} catch (Exception e) {
|
||||
softDeletesRetentionLock.close();
|
||||
throw e;
|
||||
}
|
||||
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
|
||||
} else {
|
||||
return translog.acquireRetentionLock();
|
||||
}
|
||||
|
|
|
@ -177,10 +177,9 @@ public class RecoverySourceHandler {
|
|||
// We must have everything above the local checkpoint in the commit
|
||||
requiredSeqNoRangeStart =
|
||||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
||||
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
|
||||
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
|
||||
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
|
||||
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
|
||||
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
|
||||
// still filter out legacy operations without seqNo.
|
||||
startingSeqNo = 0;
|
||||
try {
|
||||
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
|
||||
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
|
||||
|
|
|
@ -417,6 +417,10 @@ public class RecoveryState implements ToXContentFragment, Streamable, Writeable
|
|||
stopTime = 0;
|
||||
}
|
||||
|
||||
// for tests
|
||||
public long getStartNanoTime() {
|
||||
return startNanoTime;
|
||||
}
|
||||
}
|
||||
|
||||
public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
|
||||
|
|
|
@ -115,16 +115,10 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
|||
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
|
||||
}
|
||||
if (syncNeeded && globalCheckPoint < numDocs - 1) {
|
||||
if (shard.indexSettings.isSoftDeleteEnabled()) {
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
|
||||
} else {
|
||||
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
|
||||
}
|
||||
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
|
||||
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
|
||||
} else {
|
||||
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
|
||||
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
|
||||
|
|
|
@ -26,11 +26,13 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
|
@ -786,4 +788,55 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
assertHitCount(client().prepareSearch(indexName).get(), numDocs);
|
||||
}
|
||||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.indices.recovery:TRACE")
|
||||
public void testHistoryRetention() throws Exception {
|
||||
internalCluster().startNodes(3);
|
||||
|
||||
final String indexName = "test";
|
||||
client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get();
|
||||
ensureGreen(indexName);
|
||||
|
||||
// Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case
|
||||
final List<IndexRequestBuilder> requests = new ArrayList<>();
|
||||
final int replicatedDocCount = scaledRandomIntBetween(25, 250);
|
||||
while (requests.size() < replicatedDocCount) {
|
||||
requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON));
|
||||
}
|
||||
indexRandom(true, requests);
|
||||
if (randomBoolean()) {
|
||||
flush(indexName);
|
||||
}
|
||||
|
||||
internalCluster().stopRandomNode(s -> true);
|
||||
internalCluster().stopRandomNode(s -> true);
|
||||
|
||||
final long desyncNanoTime = System.nanoTime();
|
||||
while (System.nanoTime() <= desyncNanoTime) {
|
||||
// time passes
|
||||
}
|
||||
|
||||
final int numNewDocs = scaledRandomIntBetween(25, 250);
|
||||
for (int i = 0; i < numNewDocs; i++) {
|
||||
client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
|
||||
}
|
||||
// Flush twice to update the safe commit's local checkpoint
|
||||
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
|
||||
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
|
||||
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
internalCluster().startNode();
|
||||
ensureGreen(indexName);
|
||||
|
||||
final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get();
|
||||
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName);
|
||||
recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime);
|
||||
|
||||
assertThat(recoveryStates, hasSize(1));
|
||||
assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0));
|
||||
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,8 +68,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
shards.addReplica();
|
||||
shards.startAll();
|
||||
final IndexShard replica = shards.getReplicas().get(0);
|
||||
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
|
||||
assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs));
|
||||
shards.assertAllEqual(docs + moreDocs);
|
||||
}
|
||||
}
|
||||
|
@ -282,8 +281,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
shards.recoverReplica(newReplica);
|
||||
// file based recovery should be made
|
||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
|
||||
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
|
||||
|
||||
// history uuid was restored
|
||||
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
|
||||
|
@ -387,8 +385,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
shards.recoverReplica(replica);
|
||||
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
|
||||
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
|
||||
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
|
||||
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
|
||||
shards.assertAllEqual(numDocs);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue