From 527dcfd98dcf700b247f0c587221e9171ed47444 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 22 May 2017 08:48:33 +0200 Subject: [PATCH] [ML] Make job closing robust against crashes in autodetect and other misbehavior (elastic/x-pack-elasticsearch#1480) Set job to failed if autodetect manager fails closing, fix force closing of jobs that hang in closing state, set timeout when waiting for clusterstate update, disallow closing of failed jobs with normal close relates elastic/x-pack-elasticsearch#1453 Original commit: elastic/x-pack-elasticsearch@493cf85e225f4977d5b82d3801a94c362b48649a --- .../xpack/ml/action/CloseJobAction.java | 37 +++++++++++++----- .../autodetect/AutodetectProcessManager.java | 3 +- .../persistent/PersistentTasksService.java | 2 +- .../ml/action/CloseJobActionRequestTests.java | 39 ++++++++++++++++--- .../AutodetectProcessManagerTests.java | 28 ++++++++++++- 5 files changed, 91 insertions(+), 18 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 4a9b57ad928..343fd1cd128 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; @@ -57,7 +58,6 @@ import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -344,7 +344,7 @@ public class CloseJobAction extends Action openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs); + resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs, request.isForce()); request.setOpenJobIds(openJobs.toArray(new String[0])); request.setClosingJobIds(closingJobs.toArray(new String[0])); if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) { @@ -432,10 +432,11 @@ public class CloseJobAction extends Action listener) { PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - final int numberOfJobs = request.openJobIds.length; + final int numberOfJobs = request.openJobIds.length + request.closingJobIds.length; final AtomicInteger counter = new AtomicInteger(); final AtomicArray failures = new AtomicArray<>(numberOfJobs); - for (String jobId : request.openJobIds) { + + for (String jobId : ArrayUtils.concat(request.openJobIds, request.closingJobIds)) { PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); @@ -560,14 +561,16 @@ public class CloseJobAction extends Action openJobs, List closingJobs) { + static void resolveAndValidateJobId(String jobId, ClusterState state, List openJobs, List closingJobs, + boolean allowFailed) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -575,26 +578,40 @@ public class CloseJobAction extends Action failedJobs = new ArrayList<>(); + Consumer jobIdProcessor = id -> { validateJobAndTaskState(id, mlMetadata, tasksMetaData); Job job = mlMetadata.getJobs().get(id); if (job.isDeleted()) { return; } - addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs); + addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs, failedJobs); }; if (!Job.ALL.equals(jobId)) { jobIdProcessor.accept(jobId); + + if (allowFailed == false && failedJobs.size() > 0) { + throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", jobId); + } + } else { for (Map.Entry jobEntry : mlMetadata.getJobs().entrySet()) { jobIdProcessor.accept(jobEntry.getKey()); } + + if (allowFailed == false && failedJobs.size() > 0) { + throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close"); + } } + + // allowFailed == true + openJobs.addAll(failedJobs); } private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, - List openJobs, List closingJobs) { + List openJobs, List closingJobs, List failedJobs) { JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData); switch (jobState) { @@ -602,6 +619,8 @@ public class CloseJobAction extends Action>() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index f6524a47c74..8026d6fe2ac 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -167,7 +167,7 @@ public class PersistentTasksService extends AbstractComponent { public void onTimeout(TimeValue timeout) { listener.onFailure(new IllegalStateException("timed out after " + timeout)); } - }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE))); + }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index 89e44117453..ccc7aa0826b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -138,9 +138,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, true); assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs); assertEquals(Arrays.asList("job_id_4"), closingJobs); + + expectThrows(ElasticsearchStatusException.class, + () -> CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false)); } public void testResolve_givenJobId() { @@ -158,7 +161,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_1"), openJobs); assertEquals(Collections.emptyList(), closingJobs); @@ -169,11 +172,35 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false); assertEquals(Collections.emptyList(), openJobs); assertEquals(Collections.emptyList(), closingJobs); } + public void testResolve_givenJobIdFailed() { + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_failed").build(new Date()), false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id_failed", null, JobState.FAILED, tasksBuilder); + + ClusterState cs1 = ClusterState.builder(new ClusterName("_name")).metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlBuilder.build()).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build(); + + List openJobs = new ArrayList<>(); + List closingJobs = new ArrayList<>(); + + CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, true); + assertEquals(Arrays.asList("job_id_failed"), openJobs); + assertEquals(Collections.emptyList(), closingJobs); + + openJobs.clear(); + closingJobs.clear(); + + expectThrows(ElasticsearchStatusException.class, + () -> CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, false)); + } + public void testResolve_withSpecificJobIds() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false); @@ -193,19 +220,19 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa List openJobs = new ArrayList<>(); List closingJobs = new ArrayList<>(); - CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs, false); assertEquals(Collections.emptyList(), openJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); - CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs); + CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs, false); assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Collections.emptyList(), closingJobs); openJobs.clear(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index ccf82fe8c40..d45b0f0565e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; @@ -47,7 +48,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -322,6 +322,32 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals("[foo] exception while flushing job", holder[0].getMessage()); } + public void testCloseThrows() throws IOException { + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + AutodetectProcessManager manager = createManager(communicator); + + // let the communicator throw, simulating a problem with the underlying + // autodetect, e.g. a crash + doThrow(Exception.class).when(communicator).close(anyBoolean(), anyString()); + + // create a jobtask + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn("foo"); + manager.openJob(jobTask, false, e -> { + }); + manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), + (dataCounts1, e) -> { + }); + + // job is created + assertEquals(1, manager.numberOfOpenJobs()); + expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null)); + assertEquals(0, manager.numberOfOpenJobs()); + + verify(manager).setJobState(any(), eq(JobState.OPENED)); + verify(manager).setJobState(any(), eq(JobState.FAILED)); + } + public void testwriteUpdateProcessMessage() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");