Wait for snapshot completion in SLM snapshot invocation (#47051)
* Wait for snapshot completion in SLM snapshot invocation This changes the snapshots internally invoked by SLM to wait for completion. This allows us to capture more snapshotting failure scenarios. For example, previously a snapshot would be created and then registered as a "success", however, the snapshot may have been aborted, or it may have had a subset of its shards fail. These cases are now handled by inspecting the response to the `CreateSnapshotRequest` and ensuring that there are no failures. If any failures are present, the history store now stores the action as a failure instead of a success. Relates to #38461 and #43663
This commit is contained in:
parent
287d96d1a1
commit
a267df30fa
|
@ -53,7 +53,7 @@ public class CreateSnapshotResponse extends ActionResponse implements ToXContent
|
|||
|
||||
CreateSnapshotResponse() {}
|
||||
|
||||
CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
|
||||
public CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
|
||||
this.snapshotInfo = snapshotInfo;
|
||||
}
|
||||
|
||||
|
|
|
@ -260,7 +260,7 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable<SnapshotLifecycleP
|
|||
Map<String, Object> mergedConfiguration = new HashMap<>(configuration);
|
||||
mergedConfiguration.put("metadata", metadataWithAddedPolicyName);
|
||||
req.source(mergedConfiguration);
|
||||
req.waitForCompletion(false);
|
||||
req.waitForCompletion(true);
|
||||
return req;
|
||||
}
|
||||
|
||||
|
|
|
@ -158,7 +158,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
|
||||
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
|
||||
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
|
||||
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
|
||||
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
|
||||
LifecycleSettings.SLM_RETENTION_DURATION_SETTING);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
|
||||
|
@ -91,16 +93,32 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {
|
|||
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
|
||||
logger.debug("snapshot response for [{}]: {}",
|
||||
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse));
|
||||
final long timestamp = Instant.now().toEpochMilli();
|
||||
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
|
||||
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
|
||||
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
|
||||
request.snapshot()));
|
||||
final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo();
|
||||
|
||||
// Check that there are no failed shards, since the request may not entirely
|
||||
// fail, but may still have failures (such as in the case of an aborted snapshot)
|
||||
if (snapInfo.failedShards() == 0) {
|
||||
final long timestamp = Instant.now().toEpochMilli();
|
||||
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
|
||||
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
|
||||
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
|
||||
request.snapshot()));
|
||||
} else {
|
||||
int failures = snapInfo.failedShards();
|
||||
int total = snapInfo.totalShards();
|
||||
final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(),
|
||||
"failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed");
|
||||
// Add each failed shard's exception as suppressed, the exception contains
|
||||
// information about which shard failed
|
||||
snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause()));
|
||||
// Call the failure handler to register this as a failure and persist it
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}",
|
||||
logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}",
|
||||
policyMetadata.getPolicy().getId(), e);
|
||||
final long timestamp = Instant.now().toEpochMilli();
|
||||
clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(),
|
||||
|
|
|
@ -339,6 +339,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|||
List<SnapshotInfo> snapshots = entry.getValue();
|
||||
for (SnapshotInfo info : snapshots) {
|
||||
final String policyId = getPolicyId(info);
|
||||
final long deleteStartTime = nowNanoSupplier.getAsLong();
|
||||
deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> {
|
||||
deleted.incrementAndGet();
|
||||
if (acknowledgedResponse.isAcknowledged()) {
|
||||
|
@ -364,13 +365,15 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|||
}));
|
||||
// Check whether we have exceeded the maximum time allowed to spend deleting
|
||||
// snapshots, if we have, short-circuit the rest of the deletions
|
||||
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
|
||||
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime);
|
||||
if (elapsedDeletionTime.compareTo(maximumTime) > 0) {
|
||||
long finishTime = nowNanoSupplier.getAsLong();
|
||||
TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime);
|
||||
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime);
|
||||
TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime);
|
||||
if (totalDeletionTime.compareTo(maximumTime) > 0) {
|
||||
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," +
|
||||
" maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]",
|
||||
elapsedDeletionTime, maximumTime, deleted, count, failed);
|
||||
slmStats.deletionTime(elapsedDeletionTime);
|
||||
totalDeletionTime, maximumTime, deleted, count, failed);
|
||||
slmStats.deletionTime(totalDeletionTime);
|
||||
slmStats.retentionTimedOut();
|
||||
return;
|
||||
}
|
||||
|
@ -402,8 +405,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|||
} else {
|
||||
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot);
|
||||
}
|
||||
listener.onResponse(acknowledgedResponse);
|
||||
slmStats.snapshotDeleted(slmPolicy);
|
||||
listener.onResponse(acknowledgedResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,10 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.TriFunction;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
|
@ -47,6 +51,7 @@ import java.util.Optional;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
@ -196,6 +201,83 @@ public class SnapshotLifecycleTaskTests extends ESTestCase {
|
|||
threadPool.shutdownNow();
|
||||
}
|
||||
|
||||
public void testPartialFailureSnapshot() throws Exception {
|
||||
final String id = randomAlphaOfLength(4);
|
||||
final SnapshotLifecyclePolicyMetadata slpm = makePolicyMeta(id);
|
||||
final SnapshotLifecycleMetadata meta =
|
||||
new SnapshotLifecycleMetadata(Collections.singletonMap(id, slpm), OperationMode.RUNNING, new SnapshotLifecycleStats());
|
||||
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test"))
|
||||
.metaData(MetaData.builder()
|
||||
.putCustom(SnapshotLifecycleMetadata.TYPE, meta)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
final ThreadPool threadPool = new TestThreadPool("test");
|
||||
final AtomicBoolean clientCalled = new AtomicBoolean(false);
|
||||
final SetOnce<String> snapshotName = new SetOnce<>();
|
||||
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool);
|
||||
VerifyingClient client = new VerifyingClient(threadPool,
|
||||
(action, request, listener) -> {
|
||||
assertFalse(clientCalled.getAndSet(true));
|
||||
assertThat(action, instanceOf(CreateSnapshotAction.class));
|
||||
assertThat(request, instanceOf(CreateSnapshotRequest.class));
|
||||
|
||||
CreateSnapshotRequest req = (CreateSnapshotRequest) request;
|
||||
|
||||
SnapshotLifecyclePolicy policy = slpm.getPolicy();
|
||||
assertThat(req.snapshot(), startsWith(policy.getName() + "-"));
|
||||
assertThat(req.repository(), equalTo(policy.getRepository()));
|
||||
snapshotName.set(req.snapshot());
|
||||
if (req.indices().length > 0) {
|
||||
assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices")));
|
||||
}
|
||||
boolean globalState = policy.getConfig().get("include_global_state") == null ||
|
||||
Boolean.parseBoolean((String) policy.getConfig().get("include_global_state"));
|
||||
assertThat(req.includeGlobalState(), equalTo(globalState));
|
||||
|
||||
return new CreateSnapshotResponse(
|
||||
new SnapshotInfo(
|
||||
new SnapshotId(req.snapshot(), "uuid"),
|
||||
Arrays.asList(req.indices()),
|
||||
randomNonNegativeLong(),
|
||||
"snapshot started",
|
||||
randomNonNegativeLong(),
|
||||
3,
|
||||
Collections.singletonList(
|
||||
new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")),
|
||||
req.includeGlobalState(),
|
||||
req.userMetadata()
|
||||
));
|
||||
})) {
|
||||
final AtomicBoolean historyStoreCalled = new AtomicBoolean(false);
|
||||
SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC,
|
||||
item -> {
|
||||
assertFalse(historyStoreCalled.getAndSet(true));
|
||||
final SnapshotLifecyclePolicy policy = slpm.getPolicy();
|
||||
assertEquals(policy.getId(), item.getPolicyId());
|
||||
assertEquals(policy.getRepository(), item.getRepository());
|
||||
assertEquals(policy.getConfig(), item.getSnapshotConfiguration());
|
||||
assertEquals(snapshotName.get(), item.getSnapshotName());
|
||||
assertFalse("item should be a failure", item.isSuccess());
|
||||
assertThat(item.getErrorDetails(),
|
||||
containsString("failed to create snapshot successfully, 1 out of 3 total shards failed"));
|
||||
assertThat(item.getErrorDetails(),
|
||||
containsString("forced failure"));
|
||||
});
|
||||
|
||||
SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore);
|
||||
// Trigger the event with a matching job name for the policy
|
||||
task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm),
|
||||
System.currentTimeMillis(), System.currentTimeMillis()));
|
||||
|
||||
assertTrue("snapshot should be triggered once", clientCalled.get());
|
||||
assertTrue("history store should be called once", historyStoreCalled.get());
|
||||
}
|
||||
|
||||
threadPool.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* A client that delegates to a verifying function for action/request/listener
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue