diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java index a8bd7373335..35d2de746c3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java @@ -27,6 +27,7 @@ public class DeleteDataFrameAnalyticsAction extends ActionType { public static final DeleteJobAction INSTANCE = new DeleteJobAction(); public static final String NAME = "cluster:admin/xpack/ml/job/delete"; + public static final String DELETION_TASK_DESCRIPTION_PREFIX = "delete-job-"; private DeleteJobAction() { super(NAME, AcknowledgedResponse::new); @@ -88,8 +85,8 @@ public class DeleteJobAction extends ActionType { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new JobDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers); + public String getDescription() { + return DELETION_TASK_DESCRIPTION_PREFIX + jobId; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/DataDescription.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/DataDescription.java index 8e133a5cfa1..62dcf9297bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/DataDescription.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/DataDescription.java @@ -318,19 +318,22 @@ public class DataDescription implements ToXContentObject, Writeable { private Character fieldDelimiter; private Character quoteCharacter; - public void setFormat(DataFormat format) { + public Builder setFormat(DataFormat format) { dataFormat = ExceptionsHelper.requireNonNull(format, FORMAT_FIELD.getPreferredName() + " must not be null"); + return this; } - private void setFormat(String format) { + private Builder setFormat(String format) { setFormat(DataFormat.forString(format)); + return this; } - public void setTimeField(String fieldName) { + public Builder setTimeField(String fieldName) { timeFieldName = ExceptionsHelper.requireNonNull(fieldName, TIME_FIELD_NAME_FIELD.getPreferredName() + " must not be null"); + return this; } - public void setTimeFormat(String format) { + public Builder setTimeFormat(String format) { ExceptionsHelper.requireNonNull(format, TIME_FORMAT_FIELD.getPreferredName() + " must not be null"); switch (format) { case EPOCH: @@ -345,14 +348,17 @@ public class DataDescription implements ToXContentObject, Writeable { } } timeFormat = format; + return this; } - public void setFieldDelimiter(Character delimiter) { + public Builder setFieldDelimiter(Character delimiter) { fieldDelimiter = delimiter; + return this; } - public void setQuoteCharacter(Character value) { + public Builder setQuoteCharacter(Character value) { quoteCharacter = value; + return this; } public DataDescription build() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java deleted file mode 100644 index f3cd2abf461..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.job.persistence; - -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; - -import java.util.Map; - -public class JobDeletionTask extends Task { - - private volatile boolean started; - - public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { - super(id, type, action, description, parentTask, headers); - } - - public void start() { - started = true; - } - - public boolean isStarted() { - return started; - } -} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index a395845ccbd..69f5392b05b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -27,6 +27,7 @@ import java.util.Collections; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation; +import static org.hamcrest.Matchers.containsString; public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { @@ -79,6 +80,13 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { assertThatNumberOfAnnotationsIsEqualTo(2); } + public void testDeletingMultipleJobsInOneRequestIsImpossible() { + String jobIdA = "delete-multiple-jobs-a"; + String jobIdB = "delete-multiple-jobs-b"; + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> deleteJob(jobIdA + "," + jobIdB)); + assertThat(e.getMessage(), containsString("Invalid job_id")); + } + private void runJob(String jobId, String datafeedId) throws Exception { Detector.Builder detector = new Detector.Builder().setFunction("count"); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java new file mode 100644 index 00000000000..36d77d5c9a7 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlAssignmentNotifier; +import org.elasticsearch.xpack.ml.MlDailyMaintenanceService; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.junit.Before; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.stream.Collectors.toSet; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase { + + private JobConfigProvider jobConfigProvider; + private ThreadPool threadPool; + + @Before + public void setUpMocks() { + jobConfigProvider = new JobConfigProvider(client(), xContentRegistry()); + threadPool = mock(ThreadPool.class); + ExecutorService directExecutorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(directExecutorService); + } + + public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws InterruptedException { + MlDailyMaintenanceService maintenanceService = + new MlDailyMaintenanceService( + settings(Version.CURRENT).build(), + ClusterName.DEFAULT, + threadPool, + client(), + mock(ClusterService.class), + mock(MlAssignmentNotifier.class)); + + putJob("maintenance-test-1"); + putJob("maintenance-test-2"); + putJob("maintenance-test-3"); + assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3")); + + blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask); + assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3")); + + this.blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-2", listener)); + this.blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-3", listener)); + assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3")); + assertThat(getJob("maintenance-test-1").get(0).isDeleting(), is(false)); + assertThat(getJob("maintenance-test-2").get(0).isDeleting(), is(true)); + assertThat(getJob("maintenance-test-3").get(0).isDeleting(), is(true)); + + blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask); + assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1")); + } + + private void blockingCall(Consumer> function) throws InterruptedException { + AtomicReference exceptionHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap( + r -> { + latch.countDown(); + }, + e -> { + exceptionHolder.set(e); + latch.countDown(); + } + ); + function.accept(listener); + latch.await(); + if (exceptionHolder.get() != null) { + fail(exceptionHolder.get().getMessage()); + } + } + + private void putJob(String jobId) { + Job.Builder job = + new Job.Builder(jobId) + .setAnalysisConfig( + new AnalysisConfig.Builder((List) null) + .setBucketSpan(TimeValue.timeValueHours(1)) + .setDetectors( + Collections.singletonList( + new Detector.Builder("count", null) + .setPartitionFieldName("user") + .build()))) + .setDataDescription( + new DataDescription.Builder() + .setTimeFormat("epoch")); + + registerJob(job); + putJob(job); + } + + private Set getJobIds() { + return getJob("*").stream().map(Job::getId).collect(toSet()); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index b529495b7c3..3bfa0c965fc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -7,27 +7,42 @@ package org.elasticsearch.xpack.ml; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; import java.time.Clock; import java.time.ZonedDateTime; +import java.util.List; import java.util.Objects; import java.util.Random; +import java.util.Set; import java.util.function.Supplier; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -136,32 +151,141 @@ public class MlDailyMaintenanceService implements Releasable { return; } LOGGER.info("triggering scheduled [ML] maintenance tasks"); - executeAsyncWithOrigin(client, - ML_ORIGIN, - DeleteExpiredDataAction.INSTANCE, - new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)), - ActionListener.wrap( - response -> { - if (response.isDeleted()) { - LOGGER.info("Successfully completed [ML] maintenance tasks"); - } else { - LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); - } - }, - e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); - auditUnassignedMlTasks(clusterService.state()); + + // Step 3: Log any error that could have happened + ActionListener finalListener = ActionListener.wrap( + unused -> {}, + e -> LOGGER.error("An error occurred during [ML] maintenance tasks execution", e) + ); + + // Step 2: Delete expired data + ActionListener deleteJobsListener = ActionListener.wrap( + unused -> triggerDeleteExpiredDataTask(finalListener), + e -> { + LOGGER.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e); + // Note: Steps 1 and 2 are independent of each other and step 2 is executed even if step 1 failed. + triggerDeleteExpiredDataTask(finalListener); + } + ); + + // Step 1: Delete jobs that are in deleting state + triggerDeleteJobsInStateDeletingWithoutDeletionTask(deleteJobsListener); + + auditUnassignedMlTasks(); } finally { scheduleNext(); } } + private void triggerDeleteExpiredDataTask(ActionListener finalListener) { + ActionListener deleteExpiredDataActionListener = ActionListener.wrap( + deleteExpiredDataResponse -> { + if (deleteExpiredDataResponse.isDeleted()) { + LOGGER.info("Successfully completed [ML] maintenance task: triggerDeleteExpiredDataTask"); + } else { + LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); + } + finalListener.onResponse(new AcknowledgedResponse(true)); + }, + finalListener::onFailure + ); + + executeAsyncWithOrigin( + client, + ML_ORIGIN, + DeleteExpiredDataAction.INSTANCE, + new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)), + deleteExpiredDataActionListener); + } + + // Visible for testing + public void triggerDeleteJobsInStateDeletingWithoutDeletionTask(ActionListener finalListener) { + SetOnce> jobsInStateDeletingHolder = new SetOnce<>(); + + ActionListener>> deleteJobsActionListener = ActionListener.wrap( + deleteJobsResponses -> { + List jobIds = + deleteJobsResponses.stream() + .filter(t -> t.v2().isAcknowledged() == false) + .map(Tuple::v1) + .map(DeleteJobAction.Request::getJobId) + .collect(toList()); + if (jobIds.isEmpty()) { + LOGGER.info("Successfully completed [ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask"); + } else { + LOGGER.info("The following ML jobs could not be deleted: [" + String.join(",", jobIds) + "]"); + } + finalListener.onResponse(new AcknowledgedResponse(true)); + }, + finalListener::onFailure + ); + + ActionListener listTasksActionListener = ActionListener.wrap( + listTasksResponse -> { + Set jobsInStateDeleting = jobsInStateDeletingHolder.get(); + Set jobsWithDeletionTask = + listTasksResponse.getTasks().stream() + .filter(t -> t.getDescription() != null) + .filter(t -> t.getDescription().startsWith(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX)) + .map(t -> t.getDescription().substring(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX.length())) + .collect(toSet()); + Set jobsInStateDeletingWithoutDeletionTask = Sets.difference(jobsInStateDeleting, jobsWithDeletionTask); + if (jobsInStateDeletingWithoutDeletionTask.isEmpty()) { + finalListener.onResponse(new AcknowledgedResponse(true)); + return; + } + TypedChainTaskExecutor> chainTaskExecutor = + new TypedChainTaskExecutor<>(threadPool.executor(ThreadPool.Names.SAME), unused -> true, unused -> true); + for (String jobId : jobsInStateDeletingWithoutDeletionTask) { + DeleteJobAction.Request request = new DeleteJobAction.Request(jobId); + chainTaskExecutor.add( + listener -> + executeAsyncWithOrigin( + client, + ML_ORIGIN, + DeleteJobAction.INSTANCE, + request, + ActionListener.wrap(response -> listener.onResponse(Tuple.tuple(request, response)), listener::onFailure)) + ); + } + chainTaskExecutor.execute(deleteJobsActionListener); + }, + finalListener::onFailure + ); + + ActionListener getJobsActionListener = ActionListener.wrap( + getJobsResponse -> { + Set jobsInStateDeleting = + getJobsResponse.getResponse().results().stream() + .filter(Job::isDeleting) + .map(Job::getId) + .collect(toSet()); + if (jobsInStateDeleting.isEmpty()) { + finalListener.onResponse(new AcknowledgedResponse(true)); + return; + } + jobsInStateDeletingHolder.set(jobsInStateDeleting); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + ListTasksAction.INSTANCE, + new ListTasksRequest().setActions(DeleteJobAction.NAME), + listTasksActionListener); + }, + finalListener::onFailure + ); + + executeAsyncWithOrigin(client, ML_ORIGIN, GetJobsAction.INSTANCE, new GetJobsAction.Request("*"), getJobsActionListener); + } + /** * The idea of this is that if tasks are unassigned for days on end then they'll get a duplicate * audit warning every day, and that will mean they'll permanently have a yellow triangle next * to their entries in the UI jobs list. (This functionality may need revisiting if the condition * for displaying a yellow triangle in the UI jobs list changes.) */ - private void auditUnassignedMlTasks(ClusterState state) { + private void auditUnassignedMlTasks() { + ClusterState state = clusterService.state(); PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); if (tasks != null) { mlAssignmentNotifier.auditUnassignedMlTasks(state.nodes(), tasks); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index c7bc4b50679..f25834ecf26 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -5,30 +5,44 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.After; import org.junit.Before; -import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -57,6 +71,10 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase { public void testScheduledTriggering() throws InterruptedException { when(clusterService.state()).thenReturn(createClusterState(false)); + doAnswer(withResponse(new DeleteExpiredDataAction.Response(true))) + .when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField(""))))) + .when(client).execute(same(GetJobsAction.INSTANCE), any(), any()); int triggerCount = randomIntBetween(2, 4); CountDownLatch latch = new CountDownLatch(triggerCount); @@ -65,8 +83,9 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase { latch.await(5, TimeUnit.SECONDS); } - verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); - verify(mlAssignmentNotifier, Mockito.atLeast(triggerCount - 1)).auditUnassignedMlTasks(any(), any()); + verify(client, times(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(client, times(triggerCount - 1)).execute(same(GetJobsAction.INSTANCE), any(), any()); + verify(mlAssignmentNotifier, times(triggerCount - 1)).auditUnassignedMlTasks(any(), any()); } public void testScheduledTriggeringWhileUpgradeModeIsEnabled() throws InterruptedException { @@ -83,6 +102,132 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase { verifyNoMoreInteractions(client, clusterService, mlAssignmentNotifier); } + public void testBothTasksAreTriggered_BothTasksSucceed() throws InterruptedException { + assertThatBothTasksAreTriggered( + withResponse(new DeleteExpiredDataAction.Response(true)), + withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField(""))))); + } + + public void testBothTasksAreTriggered_DeleteExpiredDataTaskFails() throws InterruptedException { + assertThatBothTasksAreTriggered( + withResponse(new DeleteExpiredDataAction.Response(false)), + withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField(""))))); + } + + public void testBothTasksAreTriggered_DeleteExpiredDataTaskFailsWithException() throws InterruptedException { + assertThatBothTasksAreTriggered( + withException(new ElasticsearchException("exception thrown by DeleteExpiredDataAction")), + withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField(""))))); + } + + public void testBothTasksAreTriggered_DeleteJobsTaskFails() throws InterruptedException { + assertThatBothTasksAreTriggered( + withResponse(new DeleteExpiredDataAction.Response(true)), + withException(new ElasticsearchException("exception thrown by GetJobsAction"))); + } + + public void testBothTasksAreTriggered_BothTasksFail() throws InterruptedException { + assertThatBothTasksAreTriggered( + withException(new ElasticsearchException("exception thrown by DeleteExpiredDataAction")), + withException(new ElasticsearchException("exception thrown by GetJobsAction"))); + } + + private void assertThatBothTasksAreTriggered(Answer deleteExpiredDataAnswer, Answer getJobsAnswer) throws InterruptedException { + when(clusterService.state()).thenReturn(createClusterState(false)); + doAnswer(deleteExpiredDataAnswer).when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + doAnswer(getJobsAnswer).when(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + + CountDownLatch latch = new CountDownLatch(2); + try (MlDailyMaintenanceService service = createService(latch, client)) { + service.start(); + latch.await(5, TimeUnit.SECONDS); + } + + verify(client, times(2)).threadPool(); + verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any()); + verifyNoMoreInteractions(client, mlAssignmentNotifier); + } + + public void testJobInDeletingStateAlreadyHasDeletionTask() throws InterruptedException { + String jobId = "job-in-state-deleting"; + TaskInfo taskInfo = + new TaskInfo( + new TaskId("test", 123), + "test", + DeleteJobAction.NAME, + "delete-job-" + jobId, + null, + 0, + 0, + true, + new TaskId("test", 456), + Collections.emptyMap()); + + when(clusterService.state()).thenReturn(createClusterState(false)); + doAnswer(withResponse(new DeleteExpiredDataAction.Response(true))) + .when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + Job job = mock(Job.class); + when(job.getId()).thenReturn(jobId); + when(job.isDeleting()).thenReturn(true); + doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.singletonList(job), 1, new ParseField(""))))) + .when(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + doAnswer(withResponse(new ListTasksResponse(Collections.singletonList(taskInfo), Collections.emptyList(), Collections.emptyList()))) + .when(client).execute(same(ListTasksAction.INSTANCE), any(), any()); + + CountDownLatch latch = new CountDownLatch(2); + try (MlDailyMaintenanceService service = createService(latch, client)) { + service.start(); + latch.await(5, TimeUnit.SECONDS); + } + + verify(client, times(3)).threadPool(); + verify(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + verify(client).execute(same(ListTasksAction.INSTANCE), any(), any()); + verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any()); + verifyNoMoreInteractions(client, mlAssignmentNotifier); + } + + public void testJobGetsDeleted() throws InterruptedException { + testJobInDeletingStateDoesNotHaveDeletionTask(true); + } + + public void testJobDoesNotGetDeleted() throws InterruptedException { + testJobInDeletingStateDoesNotHaveDeletionTask(false); + } + + private void testJobInDeletingStateDoesNotHaveDeletionTask(boolean deleted) throws InterruptedException { + String jobId = "job-in-state-deleting"; + when(clusterService.state()).thenReturn(createClusterState(false)); + doAnswer(withResponse(new DeleteExpiredDataAction.Response(true))) + .when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + Job job = mock(Job.class); + when(job.getId()).thenReturn(jobId); + when(job.isDeleting()).thenReturn(true); + doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.singletonList(job), 1, new ParseField(""))))) + .when(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + doAnswer(withResponse(new ListTasksResponse(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))) + .when(client).execute(same(ListTasksAction.INSTANCE), any(), any()); + doAnswer(withResponse(new AcknowledgedResponse(deleted))) + .when(client).execute(same(DeleteJobAction.INSTANCE), any(), any()); + + CountDownLatch latch = new CountDownLatch(2); + try (MlDailyMaintenanceService service = createService(latch, client)) { + service.start(); + latch.await(5, TimeUnit.SECONDS); + } + + verify(client, times(4)).threadPool(); + verify(client).execute(same(GetJobsAction.INSTANCE), any(), any()); + verify(client).execute(same(ListTasksAction.INSTANCE), any(), any()); + verify(client).execute(same(DeleteJobAction.INSTANCE), any(), any()); + verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any()); + verifyNoMoreInteractions(client, mlAssignmentNotifier); + } + private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { return new MlDailyMaintenanceService(Settings.EMPTY, threadPool, client, clusterService, mlAssignmentNotifier, () -> { latch.countDown(); @@ -98,4 +243,22 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase { .nodes(DiscoveryNodes.builder().build()) .build(); } + + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }; + } + + @SuppressWarnings("unchecked") + private static Answer withException(Exception e) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(e); + return null; + }; + } }