Make MlDailyMaintenanceService delete jobs that are in deleting state anyway (#60121) (#60439)

This commit is contained in:
Przemysław Witek 2020-07-30 09:53:11 +02:00 committed by GitHub
parent aaed6b59d6
commit 9e27f7474c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 455 additions and 58 deletions

View File

@ -27,6 +27,7 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
public static final DeleteDataFrameAnalyticsAction INSTANCE = new DeleteDataFrameAnalyticsAction();
public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/delete";
public static final String DELETION_TASK_DESCRIPTION_PREFIX = "delete-analytics-";
private DeleteDataFrameAnalyticsAction() {
super(NAME, AcknowledgedResponse::new);
@ -79,6 +80,11 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
return null;
}
@Override
public String getDescription() {
return DELETION_TASK_DESCRIPTION_PREFIX + id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -13,20 +13,17 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class DeleteJobAction extends ActionType<AcknowledgedResponse> {
public static final DeleteJobAction INSTANCE = new DeleteJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/delete";
public static final String DELETION_TASK_DESCRIPTION_PREFIX = "delete-job-";
private DeleteJobAction() {
super(NAME, AcknowledgedResponse::new);
@ -88,8 +85,8 @@ public class DeleteJobAction extends ActionType<AcknowledgedResponse> {
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new JobDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers);
public String getDescription() {
return DELETION_TASK_DESCRIPTION_PREFIX + jobId;
}
@Override

View File

@ -318,19 +318,22 @@ public class DataDescription implements ToXContentObject, Writeable {
private Character fieldDelimiter;
private Character quoteCharacter;
public void setFormat(DataFormat format) {
public Builder setFormat(DataFormat format) {
dataFormat = ExceptionsHelper.requireNonNull(format, FORMAT_FIELD.getPreferredName() + " must not be null");
return this;
}
private void setFormat(String format) {
private Builder setFormat(String format) {
setFormat(DataFormat.forString(format));
return this;
}
public void setTimeField(String fieldName) {
public Builder setTimeField(String fieldName) {
timeFieldName = ExceptionsHelper.requireNonNull(fieldName, TIME_FIELD_NAME_FIELD.getPreferredName() + " must not be null");
return this;
}
public void setTimeFormat(String format) {
public Builder setTimeFormat(String format) {
ExceptionsHelper.requireNonNull(format, TIME_FORMAT_FIELD.getPreferredName() + " must not be null");
switch (format) {
case EPOCH:
@ -345,14 +348,17 @@ public class DataDescription implements ToXContentObject, Writeable {
}
}
timeFormat = format;
return this;
}
public void setFieldDelimiter(Character delimiter) {
public Builder setFieldDelimiter(Character delimiter) {
fieldDelimiter = delimiter;
return this;
}
public void setQuoteCharacter(Character value) {
public Builder setQuoteCharacter(Character value) {
quoteCharacter = value;
return this;
}
public DataDescription build() {

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.persistence;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.util.Map;
public class JobDeletionTask extends Task {
private volatile boolean started;
public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}
public void start() {
started = true;
}
public boolean isStarted() {
return started;
}
}

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
import static org.hamcrest.Matchers.containsString;
public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
@ -79,6 +80,13 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
assertThatNumberOfAnnotationsIsEqualTo(2);
}
public void testDeletingMultipleJobsInOneRequestIsImpossible() {
String jobIdA = "delete-multiple-jobs-a";
String jobIdB = "delete-multiple-jobs-b";
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> deleteJob(jobIdA + "," + jobIdB));
assertThat(e.getMessage(), containsString("Invalid job_id"));
}
private void runJob(String jobId, String datafeedId) throws Exception {
Detector.Builder detector = new Detector.Builder().setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()))

View File

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlAssignmentNotifier;
import org.elasticsearch.xpack.ml.MlDailyMaintenanceService;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.junit.Before;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase {
private JobConfigProvider jobConfigProvider;
private ThreadPool threadPool;
@Before
public void setUpMocks() {
jobConfigProvider = new JobConfigProvider(client(), xContentRegistry());
threadPool = mock(ThreadPool.class);
ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(directExecutorService);
}
public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws InterruptedException {
MlDailyMaintenanceService maintenanceService =
new MlDailyMaintenanceService(
settings(Version.CURRENT).build(),
ClusterName.DEFAULT,
threadPool,
client(),
mock(ClusterService.class),
mock(MlAssignmentNotifier.class));
putJob("maintenance-test-1");
putJob("maintenance-test-2");
putJob("maintenance-test-3");
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask);
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-2", listener));
this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-3", listener));
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
assertThat(getJob("maintenance-test-1").get(0).isDeleting(), is(false));
assertThat(getJob("maintenance-test-2").get(0).isDeleting(), is(true));
assertThat(getJob("maintenance-test-3").get(0).isDeleting(), is(true));
blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask);
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1"));
}
private <T> void blockingCall(Consumer<ActionListener<T>> function) throws InterruptedException {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
latch.countDown();
},
e -> {
exceptionHolder.set(e);
latch.countDown();
}
);
function.accept(listener);
latch.await();
if (exceptionHolder.get() != null) {
fail(exceptionHolder.get().getMessage());
}
}
private void putJob(String jobId) {
Job.Builder job =
new Job.Builder(jobId)
.setAnalysisConfig(
new AnalysisConfig.Builder((List<Detector>) null)
.setBucketSpan(TimeValue.timeValueHours(1))
.setDetectors(
Collections.singletonList(
new Detector.Builder("count", null)
.setPartitionFieldName("user")
.build())))
.setDataDescription(
new DataDescription.Builder()
.setTimeFormat("epoch"));
registerJob(job);
putJob(job);
}
private Set<String> getJobIds() {
return getJob("*").stream().map(Job::getId).collect(toSet());
}
}

View File

@ -7,27 +7,42 @@ package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.time.Clock;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -136,32 +151,141 @@ public class MlDailyMaintenanceService implements Releasable {
return;
}
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client,
ML_ORIGIN,
DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)),
ActionListener.wrap(
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
auditUnassignedMlTasks(clusterService.state());
// Step 3: Log any error that could have happened
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
unused -> {},
e -> LOGGER.error("An error occurred during [ML] maintenance tasks execution", e)
);
// Step 2: Delete expired data
ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap(
unused -> triggerDeleteExpiredDataTask(finalListener),
e -> {
LOGGER.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
// Note: Steps 1 and 2 are independent of each other and step 2 is executed even if step 1 failed.
triggerDeleteExpiredDataTask(finalListener);
}
);
// Step 1: Delete jobs that are in deleting state
triggerDeleteJobsInStateDeletingWithoutDeletionTask(deleteJobsListener);
auditUnassignedMlTasks();
} finally {
scheduleNext();
}
}
private void triggerDeleteExpiredDataTask(ActionListener<AcknowledgedResponse> finalListener) {
ActionListener<DeleteExpiredDataAction.Response> deleteExpiredDataActionListener = ActionListener.wrap(
deleteExpiredDataResponse -> {
if (deleteExpiredDataResponse.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance task: triggerDeleteExpiredDataTask");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
finalListener.onResponse(new AcknowledgedResponse(true));
},
finalListener::onFailure
);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)),
deleteExpiredDataActionListener);
}
// Visible for testing
public void triggerDeleteJobsInStateDeletingWithoutDeletionTask(ActionListener<AcknowledgedResponse> finalListener) {
SetOnce<Set<String>> jobsInStateDeletingHolder = new SetOnce<>();
ActionListener<List<Tuple<DeleteJobAction.Request, AcknowledgedResponse>>> deleteJobsActionListener = ActionListener.wrap(
deleteJobsResponses -> {
List<String> jobIds =
deleteJobsResponses.stream()
.filter(t -> t.v2().isAcknowledged() == false)
.map(Tuple::v1)
.map(DeleteJobAction.Request::getJobId)
.collect(toList());
if (jobIds.isEmpty()) {
LOGGER.info("Successfully completed [ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask");
} else {
LOGGER.info("The following ML jobs could not be deleted: [" + String.join(",", jobIds) + "]");
}
finalListener.onResponse(new AcknowledgedResponse(true));
},
finalListener::onFailure
);
ActionListener<ListTasksResponse> listTasksActionListener = ActionListener.wrap(
listTasksResponse -> {
Set<String> jobsInStateDeleting = jobsInStateDeletingHolder.get();
Set<String> jobsWithDeletionTask =
listTasksResponse.getTasks().stream()
.filter(t -> t.getDescription() != null)
.filter(t -> t.getDescription().startsWith(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX))
.map(t -> t.getDescription().substring(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX.length()))
.collect(toSet());
Set<String> jobsInStateDeletingWithoutDeletionTask = Sets.difference(jobsInStateDeleting, jobsWithDeletionTask);
if (jobsInStateDeletingWithoutDeletionTask.isEmpty()) {
finalListener.onResponse(new AcknowledgedResponse(true));
return;
}
TypedChainTaskExecutor<Tuple<DeleteJobAction.Request, AcknowledgedResponse>> chainTaskExecutor =
new TypedChainTaskExecutor<>(threadPool.executor(ThreadPool.Names.SAME), unused -> true, unused -> true);
for (String jobId : jobsInStateDeletingWithoutDeletionTask) {
DeleteJobAction.Request request = new DeleteJobAction.Request(jobId);
chainTaskExecutor.add(
listener ->
executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteJobAction.INSTANCE,
request,
ActionListener.wrap(response -> listener.onResponse(Tuple.tuple(request, response)), listener::onFailure))
);
}
chainTaskExecutor.execute(deleteJobsActionListener);
},
finalListener::onFailure
);
ActionListener<GetJobsAction.Response> getJobsActionListener = ActionListener.wrap(
getJobsResponse -> {
Set<String> jobsInStateDeleting =
getJobsResponse.getResponse().results().stream()
.filter(Job::isDeleting)
.map(Job::getId)
.collect(toSet());
if (jobsInStateDeleting.isEmpty()) {
finalListener.onResponse(new AcknowledgedResponse(true));
return;
}
jobsInStateDeletingHolder.set(jobsInStateDeleting);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
ListTasksAction.INSTANCE,
new ListTasksRequest().setActions(DeleteJobAction.NAME),
listTasksActionListener);
},
finalListener::onFailure
);
executeAsyncWithOrigin(client, ML_ORIGIN, GetJobsAction.INSTANCE, new GetJobsAction.Request("*"), getJobsActionListener);
}
/**
* The idea of this is that if tasks are unassigned for days on end then they'll get a duplicate
* audit warning every day, and that will mean they'll permanently have a yellow triangle next
* to their entries in the UI jobs list. (This functionality may need revisiting if the condition
* for displaying a yellow triangle in the UI jobs list changes.)
*/
private void auditUnassignedMlTasks(ClusterState state) {
private void auditUnassignedMlTasks() {
ClusterState state = clusterService.state();
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
if (tasks != null) {
mlAssignmentNotifier.auditUnassignedMlTasks(state.nodes(), tasks);

View File

@ -5,30 +5,44 @@
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -57,6 +71,10 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
public void testScheduledTriggering() throws InterruptedException {
when(clusterService.state()).thenReturn(createClusterState(false));
doAnswer(withResponse(new DeleteExpiredDataAction.Response(true)))
.when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField("")))))
.when(client).execute(same(GetJobsAction.INSTANCE), any(), any());
int triggerCount = randomIntBetween(2, 4);
CountDownLatch latch = new CountDownLatch(triggerCount);
@ -65,8 +83,9 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
latch.await(5, TimeUnit.SECONDS);
}
verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier, Mockito.atLeast(triggerCount - 1)).auditUnassignedMlTasks(any(), any());
verify(client, times(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(client, times(triggerCount - 1)).execute(same(GetJobsAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier, times(triggerCount - 1)).auditUnassignedMlTasks(any(), any());
}
public void testScheduledTriggeringWhileUpgradeModeIsEnabled() throws InterruptedException {
@ -83,6 +102,132 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
verifyNoMoreInteractions(client, clusterService, mlAssignmentNotifier);
}
public void testBothTasksAreTriggered_BothTasksSucceed() throws InterruptedException {
assertThatBothTasksAreTriggered(
withResponse(new DeleteExpiredDataAction.Response(true)),
withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField("")))));
}
public void testBothTasksAreTriggered_DeleteExpiredDataTaskFails() throws InterruptedException {
assertThatBothTasksAreTriggered(
withResponse(new DeleteExpiredDataAction.Response(false)),
withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField("")))));
}
public void testBothTasksAreTriggered_DeleteExpiredDataTaskFailsWithException() throws InterruptedException {
assertThatBothTasksAreTriggered(
withException(new ElasticsearchException("exception thrown by DeleteExpiredDataAction")),
withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.emptyList(), 0, new ParseField("")))));
}
public void testBothTasksAreTriggered_DeleteJobsTaskFails() throws InterruptedException {
assertThatBothTasksAreTriggered(
withResponse(new DeleteExpiredDataAction.Response(true)),
withException(new ElasticsearchException("exception thrown by GetJobsAction")));
}
public void testBothTasksAreTriggered_BothTasksFail() throws InterruptedException {
assertThatBothTasksAreTriggered(
withException(new ElasticsearchException("exception thrown by DeleteExpiredDataAction")),
withException(new ElasticsearchException("exception thrown by GetJobsAction")));
}
private void assertThatBothTasksAreTriggered(Answer<?> deleteExpiredDataAnswer, Answer<?> getJobsAnswer) throws InterruptedException {
when(clusterService.state()).thenReturn(createClusterState(false));
doAnswer(deleteExpiredDataAnswer).when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
doAnswer(getJobsAnswer).when(client).execute(same(GetJobsAction.INSTANCE), any(), any());
CountDownLatch latch = new CountDownLatch(2);
try (MlDailyMaintenanceService service = createService(latch, client)) {
service.start();
latch.await(5, TimeUnit.SECONDS);
}
verify(client, times(2)).threadPool();
verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(client).execute(same(GetJobsAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any());
verifyNoMoreInteractions(client, mlAssignmentNotifier);
}
public void testJobInDeletingStateAlreadyHasDeletionTask() throws InterruptedException {
String jobId = "job-in-state-deleting";
TaskInfo taskInfo =
new TaskInfo(
new TaskId("test", 123),
"test",
DeleteJobAction.NAME,
"delete-job-" + jobId,
null,
0,
0,
true,
new TaskId("test", 456),
Collections.emptyMap());
when(clusterService.state()).thenReturn(createClusterState(false));
doAnswer(withResponse(new DeleteExpiredDataAction.Response(true)))
.when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
Job job = mock(Job.class);
when(job.getId()).thenReturn(jobId);
when(job.isDeleting()).thenReturn(true);
doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.singletonList(job), 1, new ParseField("")))))
.when(client).execute(same(GetJobsAction.INSTANCE), any(), any());
doAnswer(withResponse(new ListTasksResponse(Collections.singletonList(taskInfo), Collections.emptyList(), Collections.emptyList())))
.when(client).execute(same(ListTasksAction.INSTANCE), any(), any());
CountDownLatch latch = new CountDownLatch(2);
try (MlDailyMaintenanceService service = createService(latch, client)) {
service.start();
latch.await(5, TimeUnit.SECONDS);
}
verify(client, times(3)).threadPool();
verify(client).execute(same(GetJobsAction.INSTANCE), any(), any());
verify(client).execute(same(ListTasksAction.INSTANCE), any(), any());
verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any());
verifyNoMoreInteractions(client, mlAssignmentNotifier);
}
public void testJobGetsDeleted() throws InterruptedException {
testJobInDeletingStateDoesNotHaveDeletionTask(true);
}
public void testJobDoesNotGetDeleted() throws InterruptedException {
testJobInDeletingStateDoesNotHaveDeletionTask(false);
}
private void testJobInDeletingStateDoesNotHaveDeletionTask(boolean deleted) throws InterruptedException {
String jobId = "job-in-state-deleting";
when(clusterService.state()).thenReturn(createClusterState(false));
doAnswer(withResponse(new DeleteExpiredDataAction.Response(true)))
.when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
Job job = mock(Job.class);
when(job.getId()).thenReturn(jobId);
when(job.isDeleting()).thenReturn(true);
doAnswer(withResponse(new GetJobsAction.Response(new QueryPage<>(Collections.singletonList(job), 1, new ParseField("")))))
.when(client).execute(same(GetJobsAction.INSTANCE), any(), any());
doAnswer(withResponse(new ListTasksResponse(Collections.emptyList(), Collections.emptyList(), Collections.emptyList())))
.when(client).execute(same(ListTasksAction.INSTANCE), any(), any());
doAnswer(withResponse(new AcknowledgedResponse(deleted)))
.when(client).execute(same(DeleteJobAction.INSTANCE), any(), any());
CountDownLatch latch = new CountDownLatch(2);
try (MlDailyMaintenanceService service = createService(latch, client)) {
service.start();
latch.await(5, TimeUnit.SECONDS);
}
verify(client, times(4)).threadPool();
verify(client).execute(same(GetJobsAction.INSTANCE), any(), any());
verify(client).execute(same(ListTasksAction.INSTANCE), any(), any());
verify(client).execute(same(DeleteJobAction.INSTANCE), any(), any());
verify(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier).auditUnassignedMlTasks(any(), any());
verifyNoMoreInteractions(client, mlAssignmentNotifier);
}
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {
return new MlDailyMaintenanceService(Settings.EMPTY, threadPool, client, clusterService, mlAssignmentNotifier, () -> {
latch.countDown();
@ -98,4 +243,22 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
.nodes(DiscoveryNodes.builder().build())
.build();
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withException(Exception e) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onFailure(e);
return null;
};
}
}