diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 4a9b57ad928..343fd1cd128 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; @@ -57,7 +58,6 @@ import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -344,7 +344,7 @@ public class CloseJobAction extends Action openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs); + resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs, request.isForce()); request.setOpenJobIds(openJobs.toArray(new String[0])); request.setClosingJobIds(closingJobs.toArray(new String[0])); if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) { @@ -432,10 +432,11 @@ public class CloseJobAction extends Action listener) { PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - final int numberOfJobs = request.openJobIds.length; + final int numberOfJobs = request.openJobIds.length + request.closingJobIds.length; final AtomicInteger counter = new AtomicInteger(); final AtomicArray failures = new AtomicArray<>(numberOfJobs); - for (String jobId : request.openJobIds) { + + for (String jobId : ArrayUtils.concat(request.openJobIds, request.closingJobIds)) { PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); @@ -560,14 +561,16 @@ public class CloseJobAction extends Action openJobs, List closingJobs) { + static void resolveAndValidateJobId(String jobId, ClusterState state, List openJobs, List closingJobs, + boolean allowFailed) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -575,26 +578,40 @@ public class CloseJobAction extends Action failedJobs = new ArrayList<>(); + Consumer jobIdProcessor = id -> { validateJobAndTaskState(id, mlMetadata, tasksMetaData); Job job = mlMetadata.getJobs().get(id); if (job.isDeleted()) { return; } - addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs); + addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs, failedJobs); }; if (!Job.ALL.equals(jobId)) { jobIdProcessor.accept(jobId); + + if (allowFailed == false && failedJobs.size() > 0) { + throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", jobId); + } + } else { for (Map.Entry jobEntry : mlMetadata.getJobs().entrySet()) { jobIdProcessor.accept(jobEntry.getKey()); } + + if (allowFailed == false && failedJobs.size() > 0) { + throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close"); + } } + + // allowFailed == true + openJobs.addAll(failedJobs); } private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, - List openJobs, List closingJobs) { + List openJobs, List closingJobs, List failedJobs) { JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData); switch (jobState) { @@ -602,6 +619,8 @@ public class CloseJobAction extends Action>() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index f6524a47c74..8026d6fe2ac 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -167,7 +167,7 @@ public class PersistentTasksService extends AbstractComponent { public void onTimeout(TimeValue timeout) { listener.onFailure(new IllegalStateException("timed out after " + timeout)); } - }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE))); + }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index 89e44117453..ccc7aa0826b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -138,9 +138,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, true); assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs); assertEquals(Arrays.asList("job_id_4"), closingJobs); + + expectThrows(ElasticsearchStatusException.class, + () -> CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false)); } public void testResolve_givenJobId() { @@ -158,7 +161,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_1"), openJobs); assertEquals(Collections.emptyList(), closingJobs); @@ -169,11 +172,35 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false); assertEquals(Collections.emptyList(), openJobs); assertEquals(Collections.emptyList(), closingJobs); } + public void testResolve_givenJobIdFailed() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_failed").build(new Date()), false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id_failed", null, JobState.FAILED, tasksBuilder); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")).metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlBuilder.build()).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build(); + + List openJobs = new ArrayList<>(); + List closingJobs = new ArrayList<>(); + + CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, true); + assertEquals(Arrays.asList("job_id_failed"), openJobs); + assertEquals(Collections.emptyList(), closingJobs); + + openJobs.clear(); + closingJobs.clear(); + + expectThrows(ElasticsearchStatusException.class, + () -> CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, false)); + } + public void testResolve_withSpecificJobIds() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false); @@ -193,19 +220,19 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs, false); assertEquals(Collections.emptyList(), openJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Collections.emptyList(), closingJobs); openJobs.clear(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index ccf82fe8c40..d45b0f0565e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; @@ -47,7 +48,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -322,6 +322,32 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals("[foo] exception while flushing job", holder[0].getMessage()); } + public void testCloseThrows() throws IOException { + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + AutodetectProcessManager manager = createManager(communicator); + + // let the communicator throw, simulating a problem with the underlying + // autodetect, e.g. a crash + doThrow(Exception.class).when(communicator).close(anyBoolean(), anyString()); + + // create a jobtask + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn("foo"); + manager.openJob(jobTask, false, e -> { + }); + manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), + (dataCounts1, e) -> { + }); + + // job is created + assertEquals(1, manager.numberOfOpenJobs()); + expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null)); + assertEquals(0, manager.numberOfOpenJobs()); + + verify(manager).setJobState(any(), eq(JobState.OPENED)); + verify(manager).setJobState(any(), eq(JobState.FAILED)); + } + public void testwriteUpdateProcessMessage() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");