Ensure error handler is called during SLM retention callback failure (#55252) (#55321)

When retrieving the snapshots for a set of repos or deleting a single snapshot, it's possible for
the body of the `ActionListener`'s `onResponse` method to throw an Exception. In this case, the
`errHandler` passed in may not be executed, resulting in the `running` boolean not being reset back
to false.

This commit uses `ActionListener.wrap(...)` instead of creating a new ActionListener, which ensures
that if the `onResponse` fails in any way, the `onFailure` handler is still called.

Resolves #55217
This commit is contained in:
Lee Hinman 2020-04-16 10:50:15 -06:00 committed by GitHub
parent 9c2865b28d
commit 8b7bdae6cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 19 deletions

View File

@ -57,7 +57,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
GetSnapshotsResponse() {
}
GetSnapshotsResponse(List<SnapshotInfo> snapshots) {
public GetSnapshotsResponse(List<SnapshotInfo> snapshots) {
this.snapshots = Collections.unmodifiableList(snapshots);
}

View File

@ -12,7 +12,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
@ -254,6 +253,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
if (repositories.isEmpty()) {
// Skip retrieving anything if there are no repositories to fetch
listener.onResponse(Collections.emptyMap());
return;
}
threadPool.generic().execute(() -> {
@ -270,9 +270,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
for (String repository : repositories) {
client.admin().cluster()
.prepareGetSnapshots(repository)
.execute(new ActionListener<GetSnapshotsResponse>() {
@Override
public void onResponse(GetSnapshotsResponse resp) {
.execute(ActionListener.wrap(resp -> {
final Set<SnapshotState> retainableStates =
new HashSet<>(Arrays.asList(SnapshotState.SUCCESS, SnapshotState.FAILED, SnapshotState.PARTIAL));
try {
@ -289,14 +287,13 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
logger.error(new ParameterizedMessage("exception computing snapshots for repository {}", repository), e);
throw e;
}
}
@Override
public void onFailure(Exception e) {
},
e -> {
logger.warn(new ParameterizedMessage("unable to retrieve snapshots for repository [{}]", repository), e);
onComplete.run();
errorHandler.accept(e);
}
});
));
}
});
}
@ -439,9 +436,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot);
CountDownLatch latch = new CountDownLatch(1);
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName())
.execute(new LatchedActionListener<>(new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
.execute(new LatchedActionListener<>(ActionListener.wrap(acknowledgedResponse -> {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot);
} else {
@ -449,16 +444,13 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
}
slmStats.snapshotDeleted(slmPolicy);
listener.onResponse(acknowledgedResponse);
}
@Override
public void onFailure(Exception e) {
},
e -> {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, snapshot), e);
slmStats.snapshotDeleteFailure(slmPolicy);
listener.onFailure(e);
}
}, latch));
}), latch));
try {
// Deletes cannot occur simultaneously, so wait for this
// deletion to complete before attempting the next one

View File

@ -6,7 +6,14 @@
package org.elasticsearch.xpack.slm;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
@ -45,6 +52,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -53,6 +61,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
@ -375,6 +384,116 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(true));
}
public void testErrStillRunsFailureHandlerWhenRetrieving() throws Exception {
ThreadPool threadPool = new TestThreadPool("slm-test");
final String policyId = "policy";
final String repoId = "repo";
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test") {
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
if (request instanceof GetSnapshotsRequest) {
logger.info("--> called");
listener.onResponse((Response) new GetSnapshotsResponse(Collections.emptyList()));
} else {
super.doExecute(action, request, listener);
}
}
}) {
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
ClusterState state = createState(policy);
ClusterServiceUtils.setState(clusterService, state);
SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService,
System::nanoTime,
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC,
(historyItem) -> fail("should never write history")),
threadPool);
AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
task.getAllRetainableSnapshots(Collections.singleton(repoId), new ActionListener<Map<String, List<SnapshotInfo>>>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> stringListMap) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}
@Override
public void onFailure(Exception e) {
fail("we have another err handler that should have been called");
}
}, errHandlerCalled::set);
assertBusy(() -> {
assertNotNull(errHandlerCalled.get());
assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure"));
});
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}
public void testErrStillRunsFailureHandlerWhenDeleting() throws Exception {
ThreadPool threadPool = new TestThreadPool("slm-test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test") {
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
if (request instanceof DeleteSnapshotRequest) {
logger.info("--> called");
listener.onResponse((Response) new AcknowledgedResponse(true));
} else {
super.doExecute(action, request, listener);
}
}
}) {
final String policyId = "policy";
final String repoId = "repo";
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
ClusterState state = createState(policy);
ClusterServiceUtils.setState(clusterService, state);
SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService,
System::nanoTime,
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC,
(historyItem) -> fail("should never write history")),
threadPool);
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
task.deleteSnapshot("policy", "foo", new SnapshotId("name", "uuid"),
new SnapshotLifecycleStats(0, 0, 0, 0, new HashMap<>()), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("--> forcing failure");
throw new ElasticsearchException("forced failure");
}
@Override
public void onFailure(Exception e) {
onFailureCalled.set(true);
}
});
assertThat(onFailureCalled.get(), equalTo(true));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}
public void testSkipWhileStopping() throws Exception {
doTestSkipDuringMode(OperationMode.STOPPING);
}