[ML] Remove PersistentTask#isCurrentStatus() usages
Original commit: elastic/x-pack-elasticsearch@efe7e1e770
This commit is contained in:
parent
e2a30331ba
commit
3986a2a06c
|
@ -74,7 +74,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
|
@ -220,7 +220,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
// Task statuses
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
|
||||
PersistentTasksNodeService.Status::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)
|
||||
);
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
|
||||
// Task statuses
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobState.NAME), JobState::fromXContent)
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -407,9 +408,9 @@ public class MlMetadata implements MetaData.Custom {
|
|||
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
|
||||
PersistentTask<?> task = getJobTask(jobId, tasks);
|
||||
if (task != null && task.getStatus() != null) {
|
||||
JobState jobTaskState = (JobState) task.getStatus();
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
if (jobTaskState != null) {
|
||||
return jobTaskState;
|
||||
return jobTaskState.getState();
|
||||
}
|
||||
}
|
||||
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.elasticsearch.xpack.XPackPlugin;
|
|||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
|
@ -348,11 +348,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
if (persistentTask == null) {
|
||||
return false;
|
||||
}
|
||||
JobState jobState = (JobState) persistentTask.getStatus();
|
||||
JobTaskStatus jobState = (JobTaskStatus) persistentTask.getStatus();
|
||||
if (jobState == null) {
|
||||
return false;
|
||||
}
|
||||
switch (jobState) {
|
||||
switch (jobState.getState()) {
|
||||
case OPENED:
|
||||
opened = true;
|
||||
return true;
|
||||
|
@ -504,9 +504,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
if (node.getId().equals(task.getExecutorNode()) == false) {
|
||||
return false;
|
||||
}
|
||||
JobState jobTaskState = (JobState) task.getStatus();
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
|
||||
task.isCurrentStatus() == false; // previous executor node failed and
|
||||
jobTaskState.staleStatus(task); // previous executor node failed and
|
||||
// current executor node didn't have the chance to set job status to OPENING
|
||||
}).size();
|
||||
} else {
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
|
@ -513,10 +514,12 @@ public class StartDatafeedAction
|
|||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
if (jobTask.getStatus() != JobState.OPENED) {
|
||||
JobTaskStatus taskStatus = (JobTaskStatus) jobTask.getStatus();
|
||||
if (taskStatus == null || taskStatus.getState() != JobState.OPENED) {
|
||||
// lets try again later when the job has been opened:
|
||||
String taskStatusAsString = taskStatus == null ? "null" : taskStatus.getState().toString();
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() +
|
||||
"] state is [" + jobTask.getStatus() + "] while state [" + JobState.OPENED + "] is required";
|
||||
"] state is [" + taskStatusAsString + "] while state [" + JobState.OPENED + "] is required";
|
||||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
|
|
|
@ -5,38 +5,25 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
/**
|
||||
* Jobs whether running or complete are in one of these states.
|
||||
* When a job is created it is initialised in to the state closed
|
||||
* i.e. it is not running.
|
||||
*/
|
||||
public enum JobState implements Task.Status {
|
||||
public enum JobState implements ToXContent, Writeable {
|
||||
|
||||
CLOSED, OPENED, FAILED;
|
||||
|
||||
public static final String NAME = "JobState";
|
||||
|
||||
private static final ConstructingObjectParser<JobState, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(constructorArg(), new ParseField("state"));
|
||||
}
|
||||
|
||||
public static JobState fromString(String name) {
|
||||
return valueOf(name.trim().toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
@ -49,11 +36,6 @@ public enum JobState implements Task.Status {
|
|||
return values()[ordinal];
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(ordinal());
|
||||
|
@ -61,22 +43,16 @@ public enum JobState implements Task.Status {
|
|||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("state", name().toLowerCase(Locale.ROOT));
|
||||
builder.endObject();
|
||||
builder.value(name().toLowerCase(Locale.ROOT));
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public static JobState fromXContent(XContentParser parser) throws IOException {
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true} if state matches any of the given {@code candidates}
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
public class JobTaskStatus implements Task.Status {
|
||||
|
||||
public static final String NAME = "JobState";
|
||||
|
||||
private static ParseField STATE = new ParseField("state");
|
||||
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
|
||||
|
||||
private static final ConstructingObjectParser<JobTaskStatus, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME,
|
||||
args -> new JobTaskStatus((JobState) args[0], (Long) args[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareField(constructorArg(), p -> {
|
||||
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||
return JobState.fromString(p.text());
|
||||
}
|
||||
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
|
||||
}, STATE, ObjectParser.ValueType.STRING);
|
||||
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
|
||||
}
|
||||
|
||||
public static JobTaskStatus fromXContent(XContentParser parser) {
|
||||
try {
|
||||
return PARSER.parse(parser, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private final JobState state;
|
||||
private final long allocationId;
|
||||
|
||||
public JobTaskStatus(JobState state, long allocationId) {
|
||||
this.state = Objects.requireNonNull(state);
|
||||
this.allocationId = allocationId;
|
||||
}
|
||||
|
||||
public JobTaskStatus(StreamInput in) throws IOException {
|
||||
state = JobState.fromStream(in);
|
||||
allocationId = in.readLong();
|
||||
}
|
||||
|
||||
public JobState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public boolean staleStatus(PersistentTask<?> task) {
|
||||
return allocationId != task.getAllocationId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
state.writeTo(out);
|
||||
out.writeLong(allocationId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(STATE.getPreferredName(), state, params);
|
||||
builder.field(ALLOCATION_ID.getPreferredName(), allocationId);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
JobTaskStatus that = (JobTaskStatus) o;
|
||||
return state == that.state &&
|
||||
Objects.equals(allocationId, that.allocationId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(state, allocationId);
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
|
|||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
|
||||
|
@ -98,10 +99,10 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
private final Auditor auditor;
|
||||
|
||||
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool,
|
||||
JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
JobDataCountsPersister jobDataCountsPersister,
|
||||
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
|
||||
NamedXContentRegistry xContentRegistry, Auditor auditor) {
|
||||
JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
JobDataCountsPersister jobDataCountsPersister,
|
||||
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
|
||||
NamedXContentRegistry xContentRegistry, Auditor auditor) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -363,7 +364,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void setJobState(JobTask jobTask, JobState state) {
|
||||
jobTask.updatePersistentStatus(state, new ActionListener<PersistentTask<?>>() {
|
||||
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId());
|
||||
|
@ -376,8 +378,9 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
|
||||
jobTask.updatePersistentStatus(state, new ActionListener<PersistentTask<?>>() {
|
||||
void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
|
||||
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
try {
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -103,6 +102,10 @@ public class AllocatedPersistentTask extends CancellableTask {
|
|||
return state.get();
|
||||
}
|
||||
|
||||
public long getAllocationId() {
|
||||
return allocationId;
|
||||
}
|
||||
|
||||
public enum State {
|
||||
STARTED, // the task is currently running
|
||||
CANCELLED, // the task is cancelled
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -78,7 +79,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
|
|||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
|
||||
PersistentTasksNodeService.Status::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
|
||||
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
|
||||
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
|
@ -283,7 +284,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
PersistentTask<OpenJobAction.Request> task =
|
||||
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment"));
|
||||
if (jobState != null) {
|
||||
task = new PersistentTask<>(task, jobState);
|
||||
task = new PersistentTask<>(task, new JobTaskStatus(jobState, 0L));
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
@ -175,7 +176,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
Map<String, String> expectedNodeAttr = new HashMap<>();
|
||||
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
|
||||
assertEquals(expectedNodeAttr, node.getAttributes());
|
||||
assertEquals(JobState.OPENED, task.getStatus());
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
});
|
||||
|
||||
logger.info("stop the only running ml node");
|
||||
|
@ -226,8 +229,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
|
||||
for (DiscoveryNode node : event.state().nodes()) {
|
||||
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
return node.getId().equals(task.getExecutorNode()) &&
|
||||
(task.getStatus() == null || task.isCurrentStatus() == false);
|
||||
(jobTaskState == null || jobTaskState.staleStatus(task));
|
||||
});
|
||||
int count = foundTasks.size();
|
||||
if (count > maxConcurrentJobAllocations) {
|
||||
|
@ -254,7 +258,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
assertEquals(numJobs, tasks.taskMap().size());
|
||||
for (PersistentTask<?> task : tasks.taskMap().values()) {
|
||||
assertNotNull(task.getExecutorNode());
|
||||
assertEquals(JobState.OPENED, task.getStatus());
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -294,7 +300,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
assertEquals(numJobs, tasks.taskMap().size());
|
||||
for (PersistentTask<?> task : tasks.taskMap().values()) {
|
||||
assertNotNull(task.getExecutorNode());
|
||||
assertEquals(JobState.OPENED, task.getStatus());
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -367,16 +375,18 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
|
||||
if (hasExecutorNode) {
|
||||
assertNotNull(task.getExecutorNode());
|
||||
assertTrue(task.isCurrentStatus());
|
||||
assertFalse(task.needsReassignment(clusterState.nodes()));
|
||||
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
|
||||
Map<String, String> expectedNodeAttr = new HashMap<>();
|
||||
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
|
||||
assertEquals(expectedNodeAttr, node.getAttributes());
|
||||
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(expectedState, jobTaskStatus.getState());
|
||||
} else {
|
||||
assertNull(task.getExecutorNode());
|
||||
}
|
||||
assertEquals(expectedState, task.getStatus());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
|||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -53,7 +54,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
assertEquals(1, tasks.taskMap().size());
|
||||
// now just double check that the first job is still opened:
|
||||
PersistentTasksCustomMetaData.PersistentTask task = tasks.taskMap().values().iterator().next();
|
||||
assertEquals(JobState.OPENED, task.getStatus());
|
||||
assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState());
|
||||
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
|
||||
assertEquals("1", openJobRequest.getJobId());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
|
||||
|
||||
public class JobTaskStatusTests extends AbstractSerializingTestCase<JobTaskStatus> {
|
||||
|
||||
@Override
|
||||
protected JobTaskStatus createTestInstance() {
|
||||
return new JobTaskStatus(randomFrom(JobState.values()), randomLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<JobTaskStatus> instanceReader() {
|
||||
return JobTaskStatus::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JobTaskStatus parseInstance(XContentParser parser) {
|
||||
return JobTaskStatus.fromXContent(parser);
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
|
|||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
|
||||
|
@ -121,10 +122,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectProcessManager manager = createManager(communicator, client);
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getAllocationId()).thenReturn(1L);
|
||||
manager.openJob("foo", jobTask, false, e -> {});
|
||||
assertEquals(1, manager.numberOfOpenJobs());
|
||||
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
|
||||
verify(jobTask).updatePersistentStatus(eq(JobState.OPENED), any());
|
||||
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
|
||||
}
|
||||
|
||||
public void testOpenJob_exceedMaxNumJobs() {
|
||||
|
|
Loading…
Reference in New Issue