diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 6c926d61b22..346773411e7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.lease.Releasable; @@ -106,11 +107,9 @@ public class MlDailyMaintenanceService implements Releasable { private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); - try { - client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()); - } catch (Exception e) { - LOGGER.error("An error occurred during maintenance tasks execution", e); - } + client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( + response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); scheduleNext(); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index 73f43a38c12..174cc5dd3c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; public class DeleteExpiredDataAction extends Action { @@ -156,11 +155,11 @@ public class DeleteExpiredDataAction extends Action mlDataRemoversIterator, ActionListener listener) { if (mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); - remover.remove(() -> { - deleteExpiredData(mlDataRemoversIterator, listener); - }); + remover.remove(ActionListener.wrap( + booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), + listener::onFailure)); } else { - logger.debug("Finished deleting expired data"); + logger.info("Completed deletion of expired data"); listener.onResponse(new Response(true)); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index efb30a1e378..44ff73d465b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; @@ -39,23 +40,23 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } @Override - public void remove(Runnable onFinish) { - removeData(newJobIterator(), onFinish); + public void remove(ActionListener listener) { + removeData(newJobIterator(), listener); } - private void removeData(Iterator jobIterator, Runnable onFinish) { + private void removeData(Iterator jobIterator, ActionListener listener) { if (jobIterator.hasNext() == false) { - onFinish.run(); + listener.onResponse(true); return; } Job job = jobIterator.next(); Long retentionDays = getRetentionDays(job); if (retentionDays == null) { - removeData(jobIterator, () -> removeData(jobIterator, onFinish)); + removeData(jobIterator, listener); return; } long cutoffEpochMs = calcCutoffEpochMs(retentionDays); - removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator, onFinish)); + removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure)); } private Iterator newJobIterator() { @@ -81,9 +82,9 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { /** * Template method to allow implementation details of various types of data (e.g. results, model snapshots). - * Implementors need to call {@code onFinish} when they are done in order to continue to the next job. + * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish); + protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 34bc90ab7b2..0f2b9aa2b73 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -57,10 +58,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { if (job.getModelSnapshotId() == null) { // No snapshot to remove - onFinish.run(); + listener.onResponse(true); return; } LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); @@ -86,7 +87,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover for (SearchHit hit : searchResponse.getHits()) { modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); } - deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish); + deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener); } catch (Exception e) { onFailure(e); } @@ -94,15 +95,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onFailure(Exception e) { - LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e); - onFinish.run(); + listener.onFailure(new ElasticsearchException("[" + job.getId() + "] Search for expired snapshots failed", e)); } }); } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, Runnable onFinish) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { - onFinish.run(); + listener.onResponse(true); return; } ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); @@ -112,7 +112,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onResponse(DeleteModelSnapshotAction.Response response) { try { - deleteModelSnapshots(modelSnapshotIterator, onFinish); + deleteModelSnapshots(modelSnapshotIterator, listener); } catch (Exception e) { onFailure(e); } @@ -120,9 +120,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onFailure(Exception e) { - LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e); - deleteModelSnapshots(modelSnapshotIterator, onFinish); + listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" + + modelSnapshot.getSnapshotId() + "]", e)); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index acd17eb4d52..738fc28c1c6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; @@ -55,7 +56,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); @@ -66,7 +67,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { if (bulkByScrollResponse.getDeleted() > 0) { auditResultsWereDeleted(job.getId(), cutoffEpochMs); } - onFinish.run(); + listener.onResponse(true); } catch (Exception e) { onFailure(e); } @@ -74,8 +75,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { @Override public void onFailure(Exception e) { - LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e); - onFinish.run(); + listener.onFailure(new ElasticsearchException("Failed to remove expired results for job [" + job.getId() + "]", e)); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 14decf04cf6..03238cdfc88 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionListener; + public interface MlDataRemover { - void remove(Runnable onFinish); + void remove(ActionListener listener); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java index 54e21079512..907929114a1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -46,7 +47,7 @@ public class MlDailyManagementServiceTests extends ESTestCase { latch.await(1, TimeUnit.SECONDS); } - verify(client, org.mockito.Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any()); + verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index c157d97813e..bef60071a61 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -53,7 +53,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; private List searchResponsesPerCall; - private Runnable onFinish; + private ActionListener listener; @Before public void setUpTests() { @@ -64,7 +64,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { clusterState = mock(ClusterState.class); when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); - onFinish = mock(Runnable.class); + listener = mock(ActionListener.class); } public void testRemove_GivenJobsWithoutRetentionPolicy() { @@ -74,9 +74,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("bar").build() )); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -84,9 +84,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { givenClientRequestsSucceed(); givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); @@ -123,7 +123,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); - verify(onFinish).run(); + verify(listener).onResponse(true); } public void testRemove_GivenClientSearchRequestsFail() throws IOException { @@ -140,17 +140,15 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - assertThat(capturedSearchRequests.size(), equalTo(2)); + assertThat(capturedSearchRequests.size(), equalTo(1)); SearchRequest searchRequest = capturedSearchRequests.get(0); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); - verify(onFinish).run(); + verify(listener).onFailure(any()); } public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { @@ -167,26 +165,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - assertThat(capturedSearchRequests.size(), equalTo(2)); + assertThat(capturedSearchRequests.size(), equalTo(1)); SearchRequest searchRequest = capturedSearchRequests.get(0); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); - verify(onFinish).run(); + verify(listener).onFailure(any()); } private void givenJobs(List jobs) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 01ce1de6a8a..8e8eb7fdea8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -46,7 +46,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { private ClusterService clusterService; private ClusterState clusterState; private List capturedDeleteByQueryRequests; - private Runnable onFinish; + private ActionListener listener; @Before public void setUpTests() { @@ -55,26 +55,16 @@ public class ExpiredResultsRemoverTests extends ESTestCase { clusterState = mock(ClusterState.class); when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); - ActionListener listener = - (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(null); - return null; - } - }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); - onFinish = mock(Runnable.class); + listener = mock(ActionListener.class); } public void testRemove_GivenNoJobs() { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -85,13 +75,13 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } - public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { + public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -99,17 +89,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); dbqRequest = capturedDeleteByQueryRequests.get(1); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); - verify(onFinish).run(); + verify(listener).onResponse(true); } - public void testRemove_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { + public void testRemove_GivenClientRequestsFailed() throws IOException { givenClientRequestsFailed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -117,14 +107,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); + assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); - dbqRequest = capturedDeleteByQueryRequests.get(1); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); - verify(onFinish).run(); + verify(listener).onFailure(any()); } private void givenClientRequestsSucceed() { @@ -143,7 +131,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; if (shouldSucceed) { - listener.onResponse(null); + BulkByScrollResponse bulkByScrollResponse = mock(BulkByScrollResponse.class); + when(bulkByScrollResponse.getDeleted()).thenReturn(42L); + listener.onResponse(bulkByScrollResponse); } else { listener.onFailure(new RuntimeException("failed")); }