From 3986a2a06c61b44dcbaaff5d450fb954431840f2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 4 Apr 2017 21:31:17 +0200 Subject: [PATCH] [ML] Remove PersistentTask#isCurrentStatus() usages Original commit: elastic/x-pack-elasticsearch@efe7e1e770f3a4b3ab72e92e2005fd3c0ff28a5d --- .../xpack/ml/MachineLearning.java | 6 +- .../elasticsearch/xpack/ml/MlMetadata.java | 5 +- .../xpack/ml/action/OpenJobAction.java | 10 +- .../xpack/ml/action/StartDatafeedAction.java | 7 +- .../xpack/ml/job/config/JobState.java | 34 +----- .../xpack/ml/job/config/JobTaskStatus.java | 106 ++++++++++++++++++ .../autodetect/AutodetectProcessManager.java | 17 +-- .../persistent/AllocatedPersistentTask.java | 5 +- .../xpack/ml/action/DatafeedJobsIT.java | 3 +- .../xpack/ml/action/OpenJobActionTests.java | 3 +- .../integration/BasicDistributedJobsIT.java | 22 +++- .../xpack/ml/integration/TooManyJobsIT.java | 3 +- .../ml/job/config/JobTaskStatusTests.java | 28 +++++ .../AutodetectProcessManagerTests.java | 4 +- 14 files changed, 194 insertions(+), 59 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 2c160593fa6..85d32c6edd1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -74,7 +74,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; -import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; @@ -220,7 +220,7 @@ public class MachineLearning implements ActionPlugin { // Task statuses new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new), - new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream), + new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new), new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream) ); } @@ -241,7 +241,7 @@ public class MachineLearning implements ActionPlugin { // Task statuses new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobState.NAME), JobState::fromXContent) + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent) ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index 436b0e9f9bc..920cdcda09f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; @@ -407,9 +408,9 @@ public class MlMetadata implements MetaData.Custom { public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTask task = getJobTask(jobId, tasks); if (task != null && task.getStatus() != null) { - JobState jobTaskState = (JobState) task.getStatus(); + JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); if (jobTaskState != null) { - return jobTaskState; + return jobTaskState.getState(); } } // If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index be6139e965c..44d5aef44be 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -44,7 +44,7 @@ import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -348,11 +348,11 @@ public class OpenJobAction extends Action PARSER = - new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0])); - - static { - PARSER.declareString(constructorArg(), new ParseField("state")); - } - public static JobState fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); } @@ -49,11 +36,6 @@ public enum JobState implements Task.Status { return values()[ordinal]; } - @Override - public String getWriteableName() { - return NAME; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(ordinal()); @@ -61,22 +43,16 @@ public enum JobState implements Task.Status { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("state", name().toLowerCase(Locale.ROOT)); - builder.endObject(); + builder.value(name().toLowerCase(Locale.ROOT)); return builder; } @Override public boolean isFragment() { - return false; + return true; } - public static JobState fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - /** * @return {@code true} if state matches any of the given {@code candidates} */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java new file mode 100644 index 00000000000..481a909468a --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.config; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class JobTaskStatus implements Task.Status { + + public static final String NAME = "JobState"; + + private static ParseField STATE = new ParseField("state"); + private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, + args -> new JobTaskStatus((JobState) args[0], (Long) args[1])); + + static { + PARSER.declareField(constructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return JobState.fromString(p.text()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, STATE, ObjectParser.ValueType.STRING); + PARSER.declareLong(constructorArg(), ALLOCATION_ID); + } + + public static JobTaskStatus fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private final JobState state; + private final long allocationId; + + public JobTaskStatus(JobState state, long allocationId) { + this.state = Objects.requireNonNull(state); + this.allocationId = allocationId; + } + + public JobTaskStatus(StreamInput in) throws IOException { + state = JobState.fromStream(in); + allocationId = in.readLong(); + } + + public JobState getState() { + return state; + } + + public boolean staleStatus(PersistentTask task) { + return allocationId != task.getAllocationId(); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + state.writeTo(out); + out.writeLong(allocationId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(STATE.getPreferredName(), state, params); + builder.field(ALLOCATION_ID.getPreferredName(), allocationId); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JobTaskStatus that = (JobTaskStatus) o; + return state == that.state && + Objects.equals(allocationId, that.allocationId); + } + + @Override + public int hashCode() { + return Objects.hash(state, allocationId); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index a8c158039b9..b4627847ee4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -98,10 +99,10 @@ public class AutodetectProcessManager extends AbstractComponent { private final Auditor auditor; public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, - JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, - JobDataCountsPersister jobDataCountsPersister, - AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - NamedXContentRegistry xContentRegistry, Auditor auditor) { + JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, + JobDataCountsPersister jobDataCountsPersister, + AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, + NamedXContentRegistry xContentRegistry, Auditor auditor) { super(settings); this.client = client; this.threadPool = threadPool; @@ -363,7 +364,8 @@ public class AutodetectProcessManager extends AbstractComponent { } private void setJobState(JobTask jobTask, JobState state) { - jobTask.updatePersistentStatus(state, new ActionListener>() { + JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); + jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()); @@ -376,8 +378,9 @@ public class AutodetectProcessManager extends AbstractComponent { }); } - public void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { - jobTask.updatePersistentStatus(state, new ActionListener>() { + void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { + JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); + jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { try { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java index 86c3e5ccae6..57c3ed6aacb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -103,6 +102,10 @@ public class AllocatedPersistentTask extends CancellableTask { return state.get(); } + public long getAllocationId() { + return allocationId; + } + public enum State { STARTED, // the task is currently running CANCELLED, // the task is cancelled diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index 1fdc322fca4..18bd4d5ab0d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; @@ -78,7 +79,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase { entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new)); entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index dc8b6df91b4..42d0f7d5b02 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; @@ -283,7 +284,7 @@ public class OpenJobActionTests extends ESTestCase { PersistentTask task = new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment")); if (jobState != null) { - task = new PersistentTask<>(task, jobState); + task = new PersistentTask<>(task, new JobTaskStatus(jobState, 0L)); } return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 4967b006a8b..e6af6d3ec49 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -175,7 +176,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { Map expectedNodeAttr = new HashMap<>(); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); assertEquals(expectedNodeAttr, node.getAttributes()); - assertEquals(JobState.OPENED, task.getStatus()); + JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); + assertNotNull(jobTaskStatus); + assertEquals(JobState.OPENED, jobTaskStatus.getState()); }); logger.info("stop the only running ml node"); @@ -226,8 +229,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { for (DiscoveryNode node : event.state().nodes()) { Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { + JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); return node.getId().equals(task.getExecutorNode()) && - (task.getStatus() == null || task.isCurrentStatus() == false); + (jobTaskState == null || jobTaskState.staleStatus(task)); }); int count = foundTasks.size(); if (count > maxConcurrentJobAllocations) { @@ -254,7 +258,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(numJobs, tasks.taskMap().size()); for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); - assertEquals(JobState.OPENED, task.getStatus()); + JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); + assertNotNull(jobTaskStatus); + assertEquals(JobState.OPENED, jobTaskStatus.getState()); } }); @@ -294,7 +300,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(numJobs, tasks.taskMap().size()); for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); - assertEquals(JobState.OPENED, task.getStatus()); + JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); + assertNotNull(jobTaskStatus); + assertEquals(JobState.OPENED, jobTaskStatus.getState()); } }, 30, TimeUnit.SECONDS); @@ -367,16 +375,18 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { if (hasExecutorNode) { assertNotNull(task.getExecutorNode()); - assertTrue(task.isCurrentStatus()); assertFalse(task.needsReassignment(clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); assertEquals(expectedNodeAttr, node.getAttributes()); + + JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); + assertNotNull(jobTaskStatus); + assertEquals(expectedState, jobTaskStatus.getState()); } else { assertNull(task.getExecutorNode()); } - assertEquals(expectedState, task.getStatus()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 8c49b98bb1f..2c74ee205f7 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; @@ -53,7 +54,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { assertEquals(1, tasks.taskMap().size()); // now just double check that the first job is still opened: PersistentTasksCustomMetaData.PersistentTask task = tasks.taskMap().values().iterator().next(); - assertEquals(JobState.OPENED, task.getStatus()); + assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState()); OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); assertEquals("1", openJobRequest.getJobId()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java new file mode 100644 index 00000000000..355870e0d2d --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.config; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; + +public class JobTaskStatusTests extends AbstractSerializingTestCase { + + @Override + protected JobTaskStatus createTestInstance() { + return new JobTaskStatus(randomFrom(JobState.values()), randomLong()); + } + + @Override + protected Writeable.Reader instanceReader() { + return JobTaskStatus::new; + } + + @Override + protected JobTaskStatus parseInstance(XContentParser parser) { + return JobTaskStatus.fromXContent(parser); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index e6f318e08fa..75b75721ec7 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; @@ -121,10 +122,11 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessManager manager = createManager(communicator, client); JobTask jobTask = mock(JobTask.class); + when(jobTask.getAllocationId()).thenReturn(1L); manager.openJob("foo", jobTask, false, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); - verify(jobTask).updatePersistentStatus(eq(JobState.OPENED), any()); + verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any()); } public void testOpenJob_exceedMaxNumJobs() {