From 8b7bdae6cb31fdf09ded56b28fc46251e2d68be4 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 16 Apr 2020 10:50:15 -0600 Subject: [PATCH] Ensure error handler is called during SLM retention callback failure (#55252) (#55321) When retrieving the snapshots for a set of repos or deleting a single snapshot, it's possible for the body of the `ActionListener`'s `onResponse` method to throw an Exception. In this case, the `errHandler` passed in may not be executed, resulting in the `running` boolean not being reset back to false. This commit uses `ActionListener.wrap(...)` instead of creating a new ActionListener, which ensures that if the `onResponse` fails in any way, the `onFailure` handler is still called. Resolves #55217 --- .../snapshots/get/GetSnapshotsResponse.java | 2 +- .../xpack/slm/SnapshotRetentionTask.java | 28 ++--- .../xpack/slm/SnapshotRetentionTaskTests.java | 119 ++++++++++++++++++ 3 files changed, 130 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java index 6c249b2b4cd..0a917fcc433 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java @@ -57,7 +57,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb GetSnapshotsResponse() { } - GetSnapshotsResponse(List snapshots) { + public GetSnapshotsResponse(List snapshots) { this.snapshots = Collections.unmodifiableList(snapshots); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index f233f690df5..e150090b534 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; -import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; @@ -254,6 +253,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { if (repositories.isEmpty()) { // Skip retrieving anything if there are no repositories to fetch listener.onResponse(Collections.emptyMap()); + return; } threadPool.generic().execute(() -> { @@ -270,9 +270,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { for (String repository : repositories) { client.admin().cluster() .prepareGetSnapshots(repository) - .execute(new ActionListener() { - @Override - public void onResponse(GetSnapshotsResponse resp) { + .execute(ActionListener.wrap(resp -> { final Set retainableStates = new HashSet<>(Arrays.asList(SnapshotState.SUCCESS, SnapshotState.FAILED, SnapshotState.PARTIAL)); try { @@ -289,14 +287,13 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { logger.error(new ParameterizedMessage("exception computing snapshots for repository {}", repository), e); throw e; } - } - - @Override - public void onFailure(Exception e) { + }, + e -> { logger.warn(new ParameterizedMessage("unable to retrieve snapshots for repository [{}]", repository), e); onComplete.run(); + errorHandler.accept(e); } - }); + )); } }); } @@ -439,9 +436,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot); CountDownLatch latch = new CountDownLatch(1); client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName()) - .execute(new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { + .execute(new LatchedActionListener<>(ActionListener.wrap(acknowledgedResponse -> { if (acknowledgedResponse.isAcknowledged()) { logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot); } else { @@ -449,16 +444,13 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { } slmStats.snapshotDeleted(slmPolicy); listener.onResponse(acknowledgedResponse); - } - - @Override - public void onFailure(Exception e) { + }, + e -> { logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", repo, snapshot), e); slmStats.snapshotDeleteFailure(slmPolicy); listener.onFailure(e); - } - }, latch)); + }), latch)); try { // Deletes cannot occur simultaneously, so wait for this // deletion to complete before attempting the next one diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 817c986162e..2cbc2e02cdd 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -6,7 +6,14 @@ package org.elasticsearch.xpack.slm; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; @@ -45,6 +52,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -375,6 +384,116 @@ public class SnapshotRetentionTaskTests extends ESTestCase { assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(true)); } + public void testErrStillRunsFailureHandlerWhenRetrieving() throws Exception { + ThreadPool threadPool = new TestThreadPool("slm-test"); + final String policyId = "policy"; + final String repoId = "repo"; + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + Client noOpClient = new NoOpClient("slm-test") { + + @Override + @SuppressWarnings("unchecked") + protected + void doExecute(ActionType action, Request request, ActionListener listener) { + if (request instanceof GetSnapshotsRequest) { + logger.info("--> called"); + listener.onResponse((Response) new GetSnapshotsResponse(Collections.emptyList())); + } else { + super.doExecute(action, request, listener); + } + } + }) { + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); + + ClusterState state = createState(policy); + ClusterServiceUtils.setState(clusterService, state); + + SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService, + System::nanoTime, + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> fail("should never write history")), + threadPool); + + AtomicReference errHandlerCalled = new AtomicReference<>(null); + task.getAllRetainableSnapshots(Collections.singleton(repoId), new ActionListener>>() { + @Override + public void onResponse(Map> stringListMap) { + logger.info("--> forcing failure"); + throw new ElasticsearchException("forced failure"); + } + + @Override + public void onFailure(Exception e) { + fail("we have another err handler that should have been called"); + } + }, errHandlerCalled::set); + + assertBusy(() -> { + assertNotNull(errHandlerCalled.get()); + assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure")); + }); + + } finally { + threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } + } + + public void testErrStillRunsFailureHandlerWhenDeleting() throws Exception { + ThreadPool threadPool = new TestThreadPool("slm-test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + Client noOpClient = new NoOpClient("slm-test") { + + @Override + @SuppressWarnings("unchecked") + protected + void doExecute(ActionType action, Request request, ActionListener listener) { + if (request instanceof DeleteSnapshotRequest) { + logger.info("--> called"); + listener.onResponse((Response) new AcknowledgedResponse(true)); + } else { + super.doExecute(action, request, listener); + } + } + }) { + final String policyId = "policy"; + final String repoId = "repo"; + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", + repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); + + ClusterState state = createState(policy); + ClusterServiceUtils.setState(clusterService, state); + + SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService, + System::nanoTime, + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, + (historyItem) -> fail("should never write history")), + threadPool); + + AtomicBoolean onFailureCalled = new AtomicBoolean(false); + AtomicReference errHandlerCalled = new AtomicReference<>(null); + task.deleteSnapshot("policy", "foo", new SnapshotId("name", "uuid"), + new SnapshotLifecycleStats(0, 0, 0, 0, new HashMap<>()), new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + logger.info("--> forcing failure"); + throw new ElasticsearchException("forced failure"); + } + + @Override + public void onFailure(Exception e) { + onFailureCalled.set(true); + } + }); + + assertThat(onFailureCalled.get(), equalTo(true)); + } finally { + threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } + } + public void testSkipWhileStopping() throws Exception { doTestSkipDuringMode(OperationMode.STOPPING); }