From 9782aaa1b85b37933d08daa41388a5f47dcf3309 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 30 Jan 2019 11:56:24 -0600 Subject: [PATCH] ML: Add reason field in JobTaskState (#38029) * ML: adding reason to job failure status * marking reason as nullable * Update AutodetectProcessManager.java --- .../core/ml/job/config/JobTaskState.java | 33 ++++++++-- .../xpack/core/ml/MlTasksTests.java | 2 +- .../ml/action/TransportCloseJobAction.java | 2 +- .../autodetect/AutodetectProcessFactory.java | 3 +- .../autodetect/AutodetectProcessManager.java | 60 ++++++++++--------- .../autodetect/NativeAutodetectProcess.java | 3 +- .../NativeAutodetectProcessFactory.java | 3 +- .../normalizer/NativeNormalizerProcess.java | 2 +- .../ml/process/AbstractNativeProcess.java | 11 ++-- .../action/TransportOpenJobActionTests.java | 2 +- .../datafeed/DatafeedNodeSelectorTests.java | 2 +- .../ml/job/config/JobTaskStateTests.java | 2 +- .../AutodetectProcessManagerTests.java | 8 +-- .../NativeAutodetectProcessTests.java | 11 ++-- 14 files changed, 90 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index 08f73d791e5..2651e475ff5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.ml.job.config; +import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -20,6 +22,7 @@ import java.io.IOException; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class JobTaskState implements PersistentTaskState { @@ -27,9 +30,11 @@ public class JobTaskState implements PersistentTaskState { private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); + private static ParseField REASON = new ParseField("reason"); private static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, true, args -> new JobTaskState((JobState) args[0], (Long) args[1])); + new ConstructingObjectParser<>(NAME, true, + args -> new JobTaskState((JobState) args[0], (Long) args[1], (String) args[2])); static { PARSER.declareField(constructorArg(), p -> { @@ -39,6 +44,7 @@ public class JobTaskState implements PersistentTaskState { throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, STATE, ObjectParser.ValueType.STRING); PARSER.declareLong(constructorArg(), ALLOCATION_ID); + PARSER.declareString(optionalConstructorArg(), REASON); } public static JobTaskState fromXContent(XContentParser parser) { @@ -51,21 +57,33 @@ public class JobTaskState implements PersistentTaskState { private final JobState state; private final long allocationId; + private final String reason; - public JobTaskState(JobState state, long allocationId) { + public JobTaskState(JobState state, long allocationId, @Nullable String reason) { this.state = Objects.requireNonNull(state); this.allocationId = allocationId; + this.reason = reason; } public JobTaskState(StreamInput in) throws IOException { state = JobState.fromStream(in); allocationId = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + reason = in.readOptionalString(); + } else { + reason = null; + } } public JobState getState() { return state; } + @Nullable + public String getReason() { + return reason; + } + /** * The job state stores the allocation ID at the time it was last set. * This method compares the allocation ID in the state with the allocation @@ -90,6 +108,9 @@ public class JobTaskState implements PersistentTaskState { public void writeTo(StreamOutput out) throws IOException { state.writeTo(out); out.writeLong(allocationId); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(reason); + } } @Override @@ -102,6 +123,9 @@ public class JobTaskState implements PersistentTaskState { builder.startObject(); builder.field(STATE.getPreferredName(), state.value()); builder.field(ALLOCATION_ID.getPreferredName(), allocationId); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } builder.endObject(); return builder; } @@ -112,11 +136,12 @@ public class JobTaskState implements PersistentTaskState { if (o == null || getClass() != o.getClass()) return false; JobTaskState that = (JobTaskState) o; return state == that.state && - Objects.equals(allocationId, that.allocationId); + Objects.equals(allocationId, that.allocationId) && + Objects.equals(reason, that.reason); } @Override public int hashCode() { - return Objects.hash(state, allocationId); + return Objects.hash(state, allocationId, reason); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index e80b47b057b..3afe76b8b17 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -33,7 +33,7 @@ public class MlTasksTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertEquals(JobState.OPENING, MlTasks.getJobState("foo", tasksBuilder.build())); - tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId())); + tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId(), null)); assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build())); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 3149928f6af..10765336602 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -266,7 +266,7 @@ public class TransportCloseJobAction extends TransportTasksAction listener) { - JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId()); + JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId(), "close job (api)"); jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java index c95e3a5f6e3..d76593eea89 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java @@ -9,6 +9,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; /** * Factory interface for creating implementations of {@link AutodetectProcess} @@ -28,5 +29,5 @@ public interface AutodetectProcessFactory { AutodetectProcess createAutodetectProcess(Job job, AutodetectParams autodetectParams, ExecutorService executorService, - Runnable onProcessCrash); + Consumer onProcessCrash); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 1d8f4f27360..6b8eada7406 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -475,14 +475,14 @@ public class AutodetectProcessManager implements ClusterStateListener { .kill(); processByAllocation.remove(jobTask.getAllocationId()); } finally { - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true)); + setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true)); } } } }); }, e1 -> { logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true)); + setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true)); }); }, e -> closeHandler.accept(e, true) @@ -601,8 +601,8 @@ public class AutodetectProcessManager implements ClusterStateListener { auditor.info(jobId, msg); } - private Runnable onProcessCrash(JobTask jobTask) { - return () -> { + private Consumer onProcessCrash(JobTask jobTask) { + return (reason) -> { ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId()); if (processContext != null) { AutodetectCommunicator communicator = processContext.getAutodetectCommunicator(); @@ -610,7 +610,7 @@ public class AutodetectProcessManager implements ClusterStateListener { communicator.destroyCategorizationAnalyzer(); } } - setJobState(jobTask, JobState.FAILED); + setJobState(jobTask, JobState.FAILED, reason); try { removeTmpStorage(jobTask.getJobId()); } catch (IOException e) { @@ -666,7 +666,7 @@ public class AutodetectProcessManager implements ClusterStateListener { throw e; } logger.warn("[" + jobId + "] Exception closing autodetect process", e); - setJobState(jobTask, JobState.FAILED); + setJobState(jobTask, JobState.FAILED, e.getMessage()); throw ExceptionsHelper.serverError("Exception closing autodetect process", e); } finally { // to ensure the contract that multiple simultaneous close calls for the same job wait until @@ -720,8 +720,8 @@ public class AutodetectProcessManager implements ClusterStateListener { return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now())); } - void setJobState(JobTask jobTask, JobState state) { - JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); + void setJobState(JobTask jobTask, JobState state, String reason) { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason); jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { @@ -735,27 +735,31 @@ public class AutodetectProcessManager implements ClusterStateListener { }); } - void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { - JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); - jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { - @Override - public void onResponse(PersistentTask persistentTask) { - try { - handler.accept(null); - } catch (IOException e1) { - logger.warn("Error while delegating response", e1); - } - } + void setJobState(JobTask jobTask, JobState state) { + setJobState(jobTask, state, null); + } - @Override - public void onFailure(Exception e) { - try { - handler.accept(e); - } catch (IOException e1) { - logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); - } - } - }); + void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason); + jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { + @Override + public void onResponse(PersistentTask persistentTask) { + try { + handler.accept(null); + } catch (IOException e1) { + logger.warn("Error while delegating response", e1); + } + } + + @Override + public void onFailure(Exception e) { + try { + handler.accept(e); + } catch (IOException e1) { + logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); + } + } + }); } public Optional> getStatistics(JobTask jobTask) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 69ed0d66c86..96d5a740975 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -28,6 +28,7 @@ import java.io.OutputStream; import java.nio.file.Path; import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; /** * Autodetect process using native code. @@ -42,7 +43,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - AutodetectResultsParser resultsParser, Runnable onProcessCrash) { + AutodetectResultsParser resultsParser, Consumer onProcessCrash) { super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); this.resultsParser = resultsParser; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 3185ebc6f1c..27bf1dd6753 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; public class NativeAutodetectProcessFactory implements AutodetectProcessFactory { @@ -56,7 +57,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory public AutodetectProcess createAutodetectProcess(Job job, AutodetectParams params, ExecutorService executorService, - Runnable onProcessCrash) { + Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(), true, false, true, true, params.modelSnapshot() != null, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index 6b67ffa6acb..ec22d35f168 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -20,7 +20,7 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize private static final String NAME = "normalizer"; NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) { - super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), () -> {}); + super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index b84bfdd38e1..25e671a6de1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -23,12 +23,14 @@ import java.nio.file.Path; import java.time.Duration; import java.time.ZonedDateTime; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; /** * Abstract class for implementing a native process. @@ -48,7 +50,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final ZonedDateTime startTime; private final int numberOfFields; private final List filesToDelete; - private final Runnable onProcessCrash; + private final Consumer onProcessCrash; private volatile Future logTailFuture; private volatile Future stateProcessorFuture; private volatile boolean processCloseInitiated; @@ -57,7 +59,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - Runnable onProcessCrash) { + Consumer onProcessCrash) { this.jobId = jobId; cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.processInStream = new BufferedOutputStream(processInStream); @@ -90,8 +92,9 @@ public abstract class AbstractNativeProcess implements NativeProcess { // by a user or other process (e.g. the Linux OOM killer) String errors = cppLogHandler.getErrors(); - LOGGER.error("[{}] {} process stopped unexpectedly: {}", jobId, getName(), errors); - onProcessCrash.run(); + String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors); + LOGGER.error(fullError); + onProcessCrash.accept(fullError); } } }); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index b23e0426090..9b7673338f6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -552,7 +552,7 @@ public class TransportOpenJobActionTests extends ESTestCase { new Assignment(nodeId, "test assignment")); if (jobState != null) { builder.updateTaskState(MlTasks.jobTaskId(jobId), - new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0))); + new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0), null)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 4b81cbb2dd6..39f3a3f4889 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -240,7 +240,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); // Set to lower allocationId, so job task is stale: - tasksBuilder.updateTaskState(MlTasks.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0)); + tasksBuilder.updateTaskState(MlTasks.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0, null)); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java index 26560f1034f..7048c4b5d2d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java @@ -15,7 +15,7 @@ public class JobTaskStateTests extends AbstractSerializingTestCase @Override protected JobTaskState createTestInstance() { - return new JobTaskState(randomFrom(JobState.values()), randomLong()); + return new JobTaskState(randomFrom(JobState.values()), randomLong(), randomAlphaOfLength(10)); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 6b4fd270b1b..14b6d08514f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -204,7 +204,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { manager.openJob(jobTask, clusterState, (e, b) -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); - verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); + verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L, null)), any()); } @@ -266,10 +266,10 @@ public class AutodetectProcessManagerTests extends ESTestCase { doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { - CheckedConsumer consumer = (CheckedConsumer) invocationOnMock.getArguments()[2]; + CheckedConsumer consumer = (CheckedConsumer) invocationOnMock.getArguments()[3]; consumer.accept(null); return null; - }).when(manager).setJobState(any(), eq(JobState.FAILED), any()); + }).when(manager).setJobState(any(), eq(JobState.FAILED), any(), any()); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); @@ -512,7 +512,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null)); assertEquals(0, manager.numberOfOpenJobs()); - verify(manager).setJobState(any(), eq(JobState.FAILED)); + verify(manager).setJobState(any(), eq(JobState.FAILED), any()); } public void testWriteUpdateProcessMessage() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 8542061c761..3f1275142b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -58,7 +59,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, mock(OutputStream.class), outputStream, mock(OutputStream.class), NUMBER_FIELDS, null, - new AutodetectResultsParser(), mock(Runnable.class))) { + new AutodetectResultsParser(), mock(Consumer.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -80,7 +81,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new AutodetectResultsParser(), mock(Consumer.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); @@ -114,7 +115,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new AutodetectResultsParser(), mock(Consumer.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); @@ -147,7 +148,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new AutodetectResultsParser(), mock(Consumer.class))) { process.consumeAndCloseOutputStream(); assertThat(processOutStream.available(), equalTo(0)); @@ -162,7 +163,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new AutodetectResultsParser(), mock(Consumer.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process);