[ML] Report errors back to listener in DeleteExpiredDataAction (elastic/x-pack-elasticsearch#3072)

Currently, any errors that occur during the DeleteExpiredDataAction are logged and the deletion proceeds to the next job. The user will get no indication in the response that something went wrong although nothing should really go wrong unless the cluster is messed up.

This commit changes this so that errors are reported back to the action.

Original commit: elastic/x-pack-elasticsearch@489cf03c3e
This commit is contained in:
Dimitris Athanasiou 2017-11-21 12:03:04 +00:00 committed by GitHub
parent 754623753a
commit 83ca6e8064
9 changed files with 67 additions and 86 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.lease.Releasable;
@ -106,11 +107,9 @@ public class MlDailyMaintenanceService implements Releasable {
private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
try {
client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request());
} catch (Exception e) {
LOGGER.error("An error occurred during maintenance tasks execution", e);
}
client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}
}

View File

@ -38,7 +38,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Request, DeleteExpiredDataAction.Response,
DeleteExpiredDataAction.RequestBuilder> {
@ -156,11 +155,11 @@ public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Requ
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator, ActionListener<Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
remover.remove(() -> {
deleteExpiredData(mlDataRemoversIterator, listener);
});
remover.remove(ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener),
listener::onFailure));
} else {
logger.debug("Finished deleting expired data");
logger.info("Completed deletion of expired data");
listener.onResponse(new Response(true));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
@ -39,23 +40,23 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
@Override
public void remove(Runnable onFinish) {
removeData(newJobIterator(), onFinish);
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
}
private void removeData(Iterator<Job> jobIterator, Runnable onFinish) {
private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> listener) {
if (jobIterator.hasNext() == false) {
onFinish.run();
listener.onResponse(true);
return;
}
Job job = jobIterator.next();
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, () -> removeData(jobIterator, onFinish));
removeData(jobIterator, listener);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator, onFinish));
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
}
private Iterator<Job> newJobIterator() {
@ -81,9 +82,9 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code onFinish} when they are done in order to continue to the next job.
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish);
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
@ -57,10 +58,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
// No snapshot to remove
onFinish.run();
listener.onResponse(true);
return;
}
LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
@ -86,7 +87,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish);
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener);
} catch (Exception e) {
onFailure(e);
}
@ -94,15 +95,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e);
onFinish.run();
listener.onFailure(new ElasticsearchException("[" + job.getId() + "] Search for expired snapshots failed", e));
}
});
}
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, Runnable onFinish) {
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
onFinish.run();
listener.onResponse(true);
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
@ -112,7 +112,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onResponse(DeleteModelSnapshotAction.Response response) {
try {
deleteModelSnapshots(modelSnapshotIterator, onFinish);
deleteModelSnapshots(modelSnapshotIterator, listener);
} catch (Exception e) {
onFailure(e);
}
@ -120,9 +120,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e);
deleteModelSnapshots(modelSnapshotIterator, onFinish);
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e));
}
});
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
@ -55,7 +56,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);
@ -66,7 +67,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
if (bulkByScrollResponse.getDeleted() > 0) {
auditResultsWereDeleted(job.getId(), cutoffEpochMs);
}
onFinish.run();
listener.onResponse(true);
} catch (Exception e) {
onFailure(e);
}
@ -74,8 +75,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
@Override
public void onFailure(Exception e) {
LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e);
onFinish.run();
listener.onFailure(new ElasticsearchException("Failed to remove expired results for job [" + job.getId() + "]", e));
}
});
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
public interface MlDataRemover {
void remove(Runnable onFinish);
void remove(ActionListener<Boolean> listener);
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -46,7 +47,7 @@ public class MlDailyManagementServiceTests extends ESTestCase {
latch.await(1, TimeUnit.SECONDS);
}
verify(client, org.mockito.Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any());
verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
}
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {

View File

@ -53,7 +53,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
private List<SearchRequest> capturedSearchRequests;
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
private List<SearchResponse> searchResponsesPerCall;
private Runnable onFinish;
private ActionListener<Boolean> listener;
@Before
public void setUpTests() {
@ -64,7 +64,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
onFinish = mock(Runnable.class);
listener = mock(ActionListener.class);
}
public void testRemove_GivenJobsWithoutRetentionPolicy() {
@ -74,9 +74,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("bar").build()
));
createExpiredModelSnapshotsRemover().remove(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
@ -84,9 +84,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
createExpiredModelSnapshotsRemover().remove(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
@ -123,7 +123,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
verify(listener).onResponse(true);
}
public void testRemove_GivenClientSearchRequestsFail() throws IOException {
@ -140,17 +140,15 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
assertThat(capturedSearchRequests.size(), equalTo(1));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
verify(onFinish).run();
verify(listener).onFailure(any());
}
public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException {
@ -167,26 +165,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
assertThat(capturedSearchRequests.size(), equalTo(1));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
verify(listener).onFailure(any());
}
private void givenJobs(List<Job> jobs) {

View File

@ -46,7 +46,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
private ClusterService clusterService;
private ClusterState clusterState;
private List<DeleteByQueryRequest> capturedDeleteByQueryRequests;
private Runnable onFinish;
private ActionListener<Boolean> listener;
@Before
public void setUpTests() {
@ -55,26 +55,16 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(null);
return null;
}
}).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any());
onFinish = mock(Runnable.class);
listener = mock(ActionListener.class);
}
public void testRemove_GivenNoJobs() {
givenClientRequestsSucceed();
givenJobs(Collections.emptyList());
createExpiredResultsRemover().remove(onFinish);
createExpiredResultsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
@ -85,13 +75,13 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("bar").build()
));
createExpiredResultsRemover().remove(onFinish);
createExpiredResultsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws IOException {
public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -99,17 +89,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().remove(onFinish);
createExpiredResultsRemover().remove(listener);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
verify(listener).onResponse(true);
}
public void testRemove_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException {
public void testRemove_GivenClientRequestsFailed() throws IOException {
givenClientRequestsFailed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -117,14 +107,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().remove(onFinish);
createExpiredResultsRemover().remove(listener);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
assertThat(capturedDeleteByQueryRequests.size(), equalTo(1));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
verify(listener).onFailure(any());
}
private void givenClientRequestsSucceed() {
@ -143,7 +131,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
if (shouldSucceed) {
listener.onResponse(null);
BulkByScrollResponse bulkByScrollResponse = mock(BulkByScrollResponse.class);
when(bulkByScrollResponse.getDeleted()).thenReturn(42L);
listener.onResponse(bulkByScrollResponse);
} else {
listener.onFailure(new RuntimeException("failed"));
}