[ML] Make job closing robust against crashes in autodetect and other misbehavior (elastic/x-pack-elasticsearch#1480)

Set job to failed if autodetect manager fails closing, fix force closing of jobs that hang in closing 
state, set timeout when waiting for clusterstate update, disallow closing of failed jobs with normal 
close

relates elastic/x-pack-elasticsearch#1453 

Original commit: elastic/x-pack-elasticsearch@493cf85e22
This commit is contained in:
Hendrik Muhs 2017-05-22 08:48:33 +02:00 committed by GitHub
parent 392e67851e
commit 527dcfd98d
5 changed files with 91 additions and 18 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
@ -57,7 +58,6 @@ import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -344,7 +344,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
List<String> openJobs = new ArrayList<>(); List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>(); List<String> closingJobs = new ArrayList<>();
resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs); resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs, request.isForce());
request.setOpenJobIds(openJobs.toArray(new String[0])); request.setOpenJobIds(openJobs.toArray(new String[0]));
request.setClosingJobIds(closingJobs.toArray(new String[0])); request.setClosingJobIds(closingJobs.toArray(new String[0]));
if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) { if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) {
@ -432,10 +432,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private void forceCloseJob(ClusterState currentState, Request request, ActionListener<Response> listener) { private void forceCloseJob(ClusterState currentState, Request request, ActionListener<Response> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
final int numberOfJobs = request.openJobIds.length; final int numberOfJobs = request.openJobIds.length + request.closingJobIds.length;
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs); final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);
for (String jobId : request.openJobIds) {
for (String jobId : ArrayUtils.concat(request.openJobIds, request.closingJobIds)) {
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks); PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) { if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
@ -560,14 +561,16 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
* Expand the {@code jobId} parameter and add the job Id the the list arguments * Expand the {@code jobId} parameter and add the job Id the the list arguments
* depending on job state. * depending on job state.
* *
* Opened or failed jobs are added to {@code openJobs} and closing jobs added to * Opened jobs are added to {@code openJobs} and closing jobs added to {@code closingJobs}. Failed jobs are added
* {@code closingJobs} * to {@code openJobs} if allowFailed is set otherwise an exception is thrown.
* @param jobId The job Id. If jobId == {@link Job#ALL} then expand the job list. * @param jobId The job Id. If jobId == {@link Job#ALL} then expand the job list.
* @param state Cluster state * @param state Cluster state
* @param openJobs Opened or failed jobs are added to this list * @param openJobs Opened or failed jobs are added to this list
* @param closingJobs Closing jobs are added to this list * @param closingJobs Closing jobs are added to this list
* @param allowFailed Whether failed jobs are allowed, if yes, they are added to {@code openJobs}
*/ */
static void resolveAndValidateJobId(String jobId, ClusterState state, List<String> openJobs, List<String> closingJobs) { static void resolveAndValidateJobId(String jobId, ClusterState state, List<String> openJobs, List<String> closingJobs,
boolean allowFailed) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
@ -575,26 +578,40 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return; return;
} }
List<String> failedJobs = new ArrayList<>();
Consumer<String> jobIdProcessor = id -> { Consumer<String> jobIdProcessor = id -> {
validateJobAndTaskState(id, mlMetadata, tasksMetaData); validateJobAndTaskState(id, mlMetadata, tasksMetaData);
Job job = mlMetadata.getJobs().get(id); Job job = mlMetadata.getJobs().get(id);
if (job.isDeleted()) { if (job.isDeleted()) {
return; return;
} }
addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs); addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs, failedJobs);
}; };
if (!Job.ALL.equals(jobId)) { if (!Job.ALL.equals(jobId)) {
jobIdProcessor.accept(jobId); jobIdProcessor.accept(jobId);
if (allowFailed == false && failedJobs.size() > 0) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", jobId);
}
} else { } else {
for (Map.Entry<String, Job> jobEntry : mlMetadata.getJobs().entrySet()) { for (Map.Entry<String, Job> jobEntry : mlMetadata.getJobs().entrySet()) {
jobIdProcessor.accept(jobEntry.getKey()); jobIdProcessor.accept(jobEntry.getKey());
} }
if (allowFailed == false && failedJobs.size() > 0) {
throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close");
}
} }
// allowFailed == true
openJobs.addAll(failedJobs);
} }
private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData,
List<String> openJobs, List<String> closingJobs) { List<String> openJobs, List<String> closingJobs, List<String> failedJobs) {
JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData); JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData);
switch (jobState) { switch (jobState) {
@ -602,6 +619,8 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
closingJobs.add(jobId); closingJobs.add(jobId);
break; break;
case FAILED: case FAILED:
failedJobs.add(jobId);
break;
case OPENED: case OPENED:
openJobs.add(jobId); openJobs.add(jobId);
break; break;

View File

@ -358,6 +358,7 @@ public class AutodetectProcessManager extends AbstractComponent {
communicator.close(restart, reason); communicator.close(restart, reason);
} catch (Exception e) { } catch (Exception e) {
logger.warn("Exception closing stopped process input stream", e); logger.warn("Exception closing stopped process input stream", e);
setJobState(jobTask, JobState.FAILED);
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e); throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
} }
} }
@ -378,7 +379,7 @@ public class AutodetectProcessManager extends AbstractComponent {
return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now())); return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
} }
private void setJobState(JobTask jobTask, JobState state) { void setJobState(JobTask jobTask, JobState state) {
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() { jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
@Override @Override

View File

@ -167,7 +167,7 @@ public class PersistentTasksService extends AbstractComponent {
public void onTimeout(TimeValue timeout) { public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout)); listener.onFailure(new IllegalStateException("timed out after " + timeout));
} }
}, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE))); }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout);
} }
} }

View File

@ -138,9 +138,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>(); List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>(); List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, true);
assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs); assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs);
assertEquals(Arrays.asList("job_id_4"), closingJobs); assertEquals(Arrays.asList("job_id_4"), closingJobs);
expectThrows(ElasticsearchStatusException.class,
() -> CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false));
} }
public void testResolve_givenJobId() { public void testResolve_givenJobId() {
@ -158,7 +161,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>(); List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>(); List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false);
assertEquals(Arrays.asList("job_id_1"), openJobs); assertEquals(Arrays.asList("job_id_1"), openJobs);
assertEquals(Collections.emptyList(), closingJobs); assertEquals(Collections.emptyList(), closingJobs);
@ -169,11 +172,35 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
openJobs.clear(); openJobs.clear();
closingJobs.clear(); closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("job_id_1", cs1, openJobs, closingJobs, false);
assertEquals(Collections.emptyList(), openJobs); assertEquals(Collections.emptyList(), openJobs);
assertEquals(Collections.emptyList(), closingJobs); assertEquals(Collections.emptyList(), closingJobs);
} }
public void testResolve_givenJobIdFailed() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_failed").build(new Date()), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id_failed", null, JobState.FAILED, tasksBuilder);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")).metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlBuilder.build()).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build();
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, true);
assertEquals(Arrays.asList("job_id_failed"), openJobs);
assertEquals(Collections.emptyList(), closingJobs);
openJobs.clear();
closingJobs.clear();
expectThrows(ElasticsearchStatusException.class,
() -> CloseJobAction.resolveAndValidateJobId("job_id_failed", cs1, openJobs, closingJobs, false));
}
public void testResolve_withSpecificJobIds() { public void testResolve_withSpecificJobIds() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false); mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false);
@ -193,19 +220,19 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
List<String> openJobs = new ArrayList<>(); List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>(); List<String> closingJobs = new ArrayList<>();
CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("_all", cs1, openJobs, closingJobs, false);
assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Arrays.asList("job_id_open"), openJobs);
assertEquals(Arrays.asList("job_id_closing"), closingJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs);
openJobs.clear(); openJobs.clear();
closingJobs.clear(); closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("job_id_closing", cs1, openJobs, closingJobs, false);
assertEquals(Collections.emptyList(), openJobs); assertEquals(Collections.emptyList(), openJobs);
assertEquals(Arrays.asList("job_id_closing"), closingJobs); assertEquals(Arrays.asList("job_id_closing"), closingJobs);
openJobs.clear(); openJobs.clear();
closingJobs.clear(); closingJobs.clear();
CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs); CloseJobAction.resolveAndValidateJobId("job_id_open", cs1, openJobs, closingJobs, false);
assertEquals(Arrays.asList("job_id_open"), openJobs); assertEquals(Arrays.asList("job_id_open"), openJobs);
assertEquals(Collections.emptyList(), closingJobs); assertEquals(Collections.emptyList(), closingJobs);
openJobs.clear(); openJobs.clear();

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.autodetect; package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -47,7 +48,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
@ -322,6 +322,32 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals("[foo] exception while flushing job", holder[0].getMessage()); assertEquals("[foo] exception while flushing job", holder[0].getMessage());
} }
public void testCloseThrows() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
// let the communicator throw, simulating a problem with the underlying
// autodetect, e.g. a crash
doThrow(Exception.class).when(communicator).close(anyBoolean(), anyString());
// create a jobtask
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {
});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class),
(dataCounts1, e) -> {
});
// job is created
assertEquals(1, manager.numberOfOpenJobs());
expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null));
assertEquals(0, manager.numberOfOpenJobs());
verify(manager).setJobState(any(), eq(JobState.OPENED));
verify(manager).setJobState(any(), eq(JobState.FAILED));
}
public void testwriteUpdateProcessMessage() throws IOException { public void testwriteUpdateProcessMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");