Send only ops after checkpoint in file-based recovery with soft-deletes (#33190)
Today a file-based recovery will replay all existing translog operations from the primary on a replica so that that replica can have a full history in translog as the primary. However, with soft-deletes enabled, we should not do it because: 1. All operations before the local checkpoint of the safe commit exist in the commit already. 2. The number of operations before the local checkpoint may be considerable and requires a significant amount of time to replay on a replica. Relates #30522 Relates #29530
This commit is contained in:
parent
e2b931e80b
commit
e39689a198
|
@ -162,12 +162,13 @@ public class RecoverySourceHandler {
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
||||||
}
|
}
|
||||||
// we set this to 0 to create a translog roughly according to the retention policy
|
// We must have everything above the local checkpoint in the commit
|
||||||
// on the target. Note that it will still filter out legacy operations with no sequence numbers
|
|
||||||
startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled.
|
|
||||||
// but we must have everything above the local checkpoint in the commit
|
|
||||||
requiredSeqNoRangeStart =
|
requiredSeqNoRangeStart =
|
||||||
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
||||||
|
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
|
||||||
|
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
|
||||||
|
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
|
||||||
|
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
|
||||||
try {
|
try {
|
||||||
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
|
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
|
||||||
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
|
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
|
||||||
|
|
|
@ -482,9 +482,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
||||||
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
|
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
|
||||||
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
|
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
|
||||||
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
||||||
// both cases will be zero once we start sending only ops after local checkpoint of the safe commit
|
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
|
||||||
int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0;
|
|
||||||
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,8 +219,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||||
|
|
||||||
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
|
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
|
||||||
public void testRecoveryAfterPrimaryPromotion() throws Exception {
|
public void testRecoveryAfterPrimaryPromotion() throws Exception {
|
||||||
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
|
try (ReplicationGroup shards = createGroup(2)) {
|
||||||
try (ReplicationGroup shards = createGroup(2, settings)) {
|
|
||||||
shards.startAll();
|
shards.startAll();
|
||||||
int totalDocs = shards.indexDocs(randomInt(10));
|
int totalDocs = shards.indexDocs(randomInt(10));
|
||||||
int committedDocs = 0;
|
int committedDocs = 0;
|
||||||
|
@ -232,7 +231,6 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||||
final IndexShard oldPrimary = shards.getPrimary();
|
final IndexShard oldPrimary = shards.getPrimary();
|
||||||
final IndexShard newPrimary = shards.getReplicas().get(0);
|
final IndexShard newPrimary = shards.getReplicas().get(0);
|
||||||
final IndexShard replica = shards.getReplicas().get(1);
|
final IndexShard replica = shards.getReplicas().get(1);
|
||||||
boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
// simulate docs that were inflight when primary failed, these will be rolled back
|
// simulate docs that were inflight when primary failed, these will be rolled back
|
||||||
final int rollbackDocs = randomIntBetween(1, 5);
|
final int rollbackDocs = randomIntBetween(1, 5);
|
||||||
|
@ -280,12 +278,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||||
assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo()));
|
assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo()));
|
||||||
});
|
});
|
||||||
newPrimary.flush(new FlushRequest().force(true));
|
newPrimary.flush(new FlushRequest().force(true));
|
||||||
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
|
if (replica.indexSettings().isSoftDeleteEnabled()) {
|
||||||
totalDocs += uncommittedOpsOnPrimary;
|
// We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
|
||||||
// we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen
|
// The min_retained_seqno only advances when a merge asks for the retention query.
|
||||||
if (softDeleteEnabled) {
|
|
||||||
newPrimary.flush(new FlushRequest().force(true));
|
newPrimary.flush(new FlushRequest().force(true));
|
||||||
}
|
}
|
||||||
|
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
|
||||||
|
totalDocs += uncommittedOpsOnPrimary;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -305,8 +304,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
||||||
} else {
|
} else {
|
||||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||||
int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary;
|
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
|
||||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// roll back the extra ops in the replica
|
// roll back the extra ops in the replica
|
||||||
|
|
|
@ -64,13 +64,13 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
int docs = shards.indexDocs(10);
|
int docs = shards.indexDocs(10);
|
||||||
getTranslog(shards.getPrimary()).rollGeneration();
|
getTranslog(shards.getPrimary()).rollGeneration();
|
||||||
shards.flush();
|
shards.flush();
|
||||||
if (randomBoolean()) {
|
int moreDocs = shards.indexDocs(randomInt(10));
|
||||||
docs += shards.indexDocs(10);
|
|
||||||
}
|
|
||||||
shards.addReplica();
|
shards.addReplica();
|
||||||
shards.startAll();
|
shards.startAll();
|
||||||
final IndexShard replica = shards.getReplicas().get(0);
|
final IndexShard replica = shards.getReplicas().get(0);
|
||||||
assertThat(getTranslog(replica).totalOperations(), equalTo(docs));
|
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||||
|
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
|
||||||
|
shards.assertAllEqual(docs + moreDocs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecoveryWithOutOfOrderDelete() throws Exception {
|
public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception {
|
||||||
/*
|
/*
|
||||||
* The flow of this test:
|
* The flow of this test:
|
||||||
* - delete #1
|
* - delete #1
|
||||||
|
@ -117,9 +117,69 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
|
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
|
||||||
* - index #2
|
* - index #2
|
||||||
* - index #5
|
* - index #5
|
||||||
* - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
|
* - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
|
||||||
*/
|
*/
|
||||||
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
|
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
|
||||||
|
try (ReplicationGroup shards = createGroup(1, settings)) {
|
||||||
|
shards.startAll();
|
||||||
|
// create out of order delete and index op on replica
|
||||||
|
final IndexShard orgReplica = shards.getReplicas().get(0);
|
||||||
|
final String indexName = orgReplica.shardId().getIndexName();
|
||||||
|
|
||||||
|
// delete #1
|
||||||
|
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
|
||||||
|
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
|
||||||
|
// index #0
|
||||||
|
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||||
|
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
|
||||||
|
// index #3
|
||||||
|
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||||
|
SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
|
||||||
|
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
|
||||||
|
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
||||||
|
// index #2
|
||||||
|
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||||
|
SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
|
||||||
|
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
|
||||||
|
// index #5 -> force NoOp #4.
|
||||||
|
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||||
|
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
|
||||||
|
|
||||||
|
final int translogOps;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
logger.info("--> flushing shard (translog will be trimmed)");
|
||||||
|
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
|
||||||
|
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
|
||||||
|
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
|
||||||
|
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1"));
|
||||||
|
orgReplica.indexSettings().updateIndexMetaData(builder.build());
|
||||||
|
orgReplica.onSettingsChanged();
|
||||||
|
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
|
||||||
|
} else {
|
||||||
|
logger.info("--> flushing shard (translog will be retained)");
|
||||||
|
translogOps = 6; // 5 ops + seqno gaps
|
||||||
|
}
|
||||||
|
flushShard(orgReplica);
|
||||||
|
} else {
|
||||||
|
translogOps = 6; // 5 ops + seqno gaps
|
||||||
|
}
|
||||||
|
|
||||||
|
final IndexShard orgPrimary = shards.getPrimary();
|
||||||
|
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.
|
||||||
|
|
||||||
|
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
|
||||||
|
shards.recoverReplica(newReplica);
|
||||||
|
shards.assertAllEqual(3);
|
||||||
|
|
||||||
|
assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
|
||||||
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
|
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
|
||||||
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
|
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
|
||||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
|
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
|
||||||
|
@ -148,25 +208,16 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||||
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
|
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
|
||||||
|
|
||||||
final int translogOps;
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
|
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
|
||||||
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
|
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
|
||||||
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
|
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
|
||||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
|
|
||||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
|
|
||||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
|
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
|
||||||
orgReplica.indexSettings().updateIndexMetaData(builder.build());
|
orgReplica.indexSettings().updateIndexMetaData(builder.build());
|
||||||
orgReplica.onSettingsChanged();
|
orgReplica.onSettingsChanged();
|
||||||
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
|
|
||||||
} else {
|
|
||||||
logger.info("--> flushing shard (translog/soft-deletes will be retained)");
|
|
||||||
translogOps = 6; // 5 ops + seqno gaps
|
|
||||||
}
|
}
|
||||||
flushShard(orgReplica);
|
flushShard(orgReplica);
|
||||||
} else {
|
|
||||||
translogOps = 6; // 5 ops + seqno gaps
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final IndexShard orgPrimary = shards.getPrimary();
|
final IndexShard orgPrimary = shards.getPrimary();
|
||||||
|
@ -175,8 +226,9 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
|
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
|
||||||
shards.recoverReplica(newReplica);
|
shards.recoverReplica(newReplica);
|
||||||
shards.assertAllEqual(3);
|
shards.assertAllEqual(3);
|
||||||
|
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) {
|
||||||
assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps));
|
assertThat(snapshot, SnapshotMatchers.size(6));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +280,8 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
shards.recoverReplica(newReplica);
|
shards.recoverReplica(newReplica);
|
||||||
// file based recovery should be made
|
// file based recovery should be made
|
||||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||||
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
|
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||||
|
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
|
||||||
|
|
||||||
// history uuid was restored
|
// history uuid was restored
|
||||||
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
|
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
|
||||||
|
@ -332,7 +385,8 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||||
shards.recoverReplica(replica);
|
shards.recoverReplica(replica);
|
||||||
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
|
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
|
||||||
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
|
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
|
||||||
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
|
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
|
||||||
|
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
|
||||||
shards.assertAllEqual(numDocs);
|
shards.assertAllEqual(numDocs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue