Rename {Job|Datafeed}Status to {Job|Datafeed}State (elastic/elasticsearch#856)

This is more consistent with elasticsearch where an index
has state [open, close], etc.

Original commit: elastic/x-pack-elasticsearch@30bf720c3e
This commit is contained in:
Dimitris Athanasiou 2017-02-03 10:43:05 +00:00 committed by GitHub
parent b940dbf6d9
commit ca4badeb46
40 changed files with 329 additions and 333 deletions

View File

@ -59,7 +59,7 @@ import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
@ -298,7 +298,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(DeleteJobAction.INSTANCE, DeleteJobAction.TransportAction.class),
new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class),
new ActionHandler<>(InternalOpenJobAction.INSTANCE, InternalOpenJobAction.TransportAction.class),
new ActionHandler<>(UpdateJobStatusAction.INSTANCE, UpdateJobStatusAction.TransportAction.class),
new ActionHandler<>(UpdateJobStateAction.INSTANCE, UpdateJobStateAction.TransportAction.class),
new ActionHandler<>(GetFiltersAction.INSTANCE, GetFiltersAction.TransportAction.class),
new ActionHandler<>(PutFilterAction.INSTANCE, PutFilterAction.TransportAction.class),
new ActionHandler<>(DeleteFilterAction.INSTANCE, DeleteFilterAction.TransportAction.class),

View File

@ -34,11 +34,11 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStatusObserver;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import java.io.IOException;
import java.util.Objects;
@ -182,7 +182,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService;
private final JobStatusObserver jobStatusObserver;
private final JobStateObserver jobStateObserver;
private final TransportListTasksAction listTasksAction;
private final TransportCancelTasksAction cancelTasksAction;
@ -193,7 +193,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
TransportListTasksAction listTasksAction) {
super(settings, CloseJobAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.jobStatusObserver = new JobStatusObserver(threadPool, clusterService);
this.jobStateObserver = new JobStateObserver(threadPool, clusterService);
this.cancelTasksAction = cancelTasksAction;
this.listTasksAction = listTasksAction;
}
@ -214,7 +214,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(
cancelTasksResponse -> {
jobStatusObserver.waitForStatus(request.jobId, request.closeTimeout, JobStatus.CLOSED,
jobStateObserver.waitForState(request.jobId, request.closeTimeout, JobState.CLOSED,
e -> {
if (e != null) {
listener.onFailure(e);
@ -239,9 +239,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
throw ExceptionsHelper.missingJobException(jobId);
}
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("job not opened, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
if (allocation.getState() != JobState.OPENED) {
throw new ElasticsearchStatusException("job not opened, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, allocation.getState());
}
}
}

View File

@ -32,7 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
@ -54,7 +54,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
public static final String NAME = "cluster:admin/ml/datafeeds/stats/get";
public static final String ALL = "_all";
private static final String STATUS = "status";
private static final String STATE = "state";
private GetDatafeedsStatsAction() {
super(NAME);
@ -131,31 +131,31 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
public static class DatafeedStats implements ToXContent, Writeable {
private final String datafeedId;
private final DatafeedStatus datafeedStatus;
private final DatafeedState datafeedState;
DatafeedStats(String datafeedId, DatafeedStatus datafeedStatus) {
DatafeedStats(String datafeedId, DatafeedState datafeedState) {
this.datafeedId = Objects.requireNonNull(datafeedId);
this.datafeedStatus = Objects.requireNonNull(datafeedStatus);
this.datafeedState = Objects.requireNonNull(datafeedState);
}
DatafeedStats(StreamInput in) throws IOException {
datafeedId = in.readString();
datafeedStatus = DatafeedStatus.fromStream(in);
datafeedState = DatafeedState.fromStream(in);
}
public String getDatafeedId() {
return datafeedId;
}
public DatafeedStatus getDatafeedStatus() {
return datafeedStatus;
public DatafeedState getDatafeedState() {
return datafeedState;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.field(STATUS, datafeedStatus);
builder.field(STATE, datafeedState);
builder.endObject();
return builder;
@ -164,12 +164,12 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(datafeedId);
datafeedStatus.writeTo(out);
datafeedState.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, datafeedStatus);
return Objects.hash(datafeedId, datafeedState);
}
@Override
@ -181,7 +181,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
return false;
}
GetDatafeedsStatsAction.Response.DatafeedStats other = (GetDatafeedsStatsAction.Response.DatafeedStats) obj;
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(this.datafeedStatus, other.datafeedStatus);
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(this.datafeedState, other.datafeedState);
}
}
@ -264,14 +264,14 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
Map<String, DatafeedStatus> statuses = new HashMap<>();
Map<String, DatafeedState> states = new HashMap<>();
PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = ALL.equals(request.getDatafeedId()) ? p -> true :
p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId());
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, predicate)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
statuses.put(storedRequest.getDatafeedId(), DatafeedStatus.STARTED);
states.put(storedRequest.getDatafeedId(), DatafeedState.STARTED);
}
}
@ -280,16 +280,16 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
if (ALL.equals(request.getDatafeedId())) {
Collection<DatafeedConfig> datafeeds = mlMetadata.getDatafeeds().values();
for (DatafeedConfig datafeed : datafeeds) {
DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), status));
DatafeedState datafeedState = states.getOrDefault(datafeed.getId(), DatafeedState.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeedState));
}
} else {
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), status));
DatafeedState datafeedState = states.getOrDefault(datafeed.getId(), DatafeedState.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeedState));
}
QueryPage<Response.DatafeedStats> statsPage = new QueryPage<>(stats, stats.size(), DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new Response(statsPage));

View File

@ -35,7 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -61,7 +61,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String STATUS = "status";
private static final String STATE = "state";
private GetJobsStatsAction() {
super(NAME);
@ -151,20 +151,20 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private DataCounts dataCounts;
@Nullable
private ModelSizeStats modelSizeStats;
private JobStatus status;
private JobState state;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobStatus status) {
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.status = Objects.requireNonNull(status);
this.state = Objects.requireNonNull(state);
}
JobStats(StreamInput in) throws IOException {
jobId = in.readString();
dataCounts = new DataCounts(in);
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
status = JobStatus.fromStream(in);
state = JobState.fromStream(in);
}
public String getJobid() {
@ -179,8 +179,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return modelSizeStats;
}
public JobStatus getStatus() {
return status;
public JobState getState() {
return state;
}
@Override
@ -191,7 +191,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
builder.field(STATUS, status);
builder.field(STATE, state);
builder.endObject();
return builder;
@ -202,12 +202,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
out.writeString(jobId);
dataCounts.writeTo(out);
out.writeOptionalWriteable(modelSizeStats);
status.writeTo(out);
state.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, status);
return Objects.hash(jobId, dataCounts, modelSizeStats, state);
}
@Override
@ -222,7 +222,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.status, other.status);
&& Objects.equals(this.state, other.state);
}
}
@ -348,8 +348,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(request.getJobId());
if (stats.isPresent()) {
JobStatus jobStatus = mlMetadata.getAllocations().get(request.jobId).getStatus();
Response.JobStats jobStats = new Response.JobStats(request.jobId, stats.get().v1(), stats.get().v2(), jobStatus);
JobState jobState = mlMetadata.getAllocations().get(request.jobId).getState();
Response.JobStats jobStats = new Response.JobStats(request.jobId, stats.get().v1(), stats.get().v2(), jobState);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
@ -372,8 +372,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
int slot = i;
String jobId = jobIds.get(i);
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobStatus jobStatus = mlMetadata.getAllocations().get(jobId).getStatus();
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobStatus));
JobState jobState = mlMetadata.getAllocations().get(jobId).getState();
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState));
if (counter.decrementAndGet() == 0) {
List<Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList().stream()

View File

@ -20,7 +20,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
public class InternalOpenJobAction extends Action<InternalOpenJobAction.Request, InternalOpenJobAction.Response,
@ -114,7 +114,7 @@ public class InternalOpenJobAction extends Action<InternalOpenJobAction.Request,
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
JobTask jobTask = (JobTask) task;
autodetectProcessManager.setJobStatus(request.getJobId(), JobStatus.OPENING, aVoid -> {
autodetectProcessManager.setJobState(request.getJobId(), JobState.OPENING, aVoid -> {
jobTask.cancelHandler = () -> autodetectProcessManager.closeJob(request.getJobId());
autodetectProcessManager.openJob(request.getJobId(), request.isIgnoreDowntime(), e -> {
if (e == null) {

View File

@ -28,10 +28,10 @@ import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStatusObserver;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import java.io.IOException;
import java.util.Objects;
@ -191,7 +191,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobStatusObserver observer;
private final JobStateObserver observer;
private final ClusterService clusterService;
private final InternalOpenJobAction.TransportAction internalOpenJobAction;
@ -201,7 +201,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
ClusterService clusterService, InternalOpenJobAction.TransportAction internalOpenJobAction) {
super(settings, OpenJobAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.observer = new JobStatusObserver(threadPool, clusterService);
this.observer = new JobStateObserver(threadPool, clusterService);
this.internalOpenJobAction = internalOpenJobAction;
}
@ -216,7 +216,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
InternalOpenJobAction.Request internalRequest = new InternalOpenJobAction.Request(request.jobId);
internalRequest.setIgnoreDowntime(internalRequest.isIgnoreDowntime());
internalOpenJobAction.execute(internalRequest, LoggingTaskListener.instance());
observer.waitForStatus(request.getJobId(), request.openTimeout, JobStatus.OPENED, e -> {
observer.waitForState(request.getJobId(), request.openTimeout, JobState.OPENED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
@ -226,12 +226,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
/**
* Fail fast before trying to update the job status on master node if the job doesn't exist or its status
* Fail fast before trying to update the job state on master node if the job doesn't exist or its state
* is not what it should be.
*/
public static void validate(MlMetadata mlMetadata, String jobId) {
MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata);
builder.updateStatus(jobId, JobStatus.OPENING, null);
builder.updateState(jobId, JobState.OPENING, null);
}
}
}

View File

@ -38,7 +38,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -288,7 +288,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
Allocation allocation = jobManager.getJobAllocation(request.getJobId());
if (job.count() > 0 && allocation.getStatus().equals(JobStatus.CLOSED) == false) {
if (job.count() > 0 && allocation.getState().equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}

View File

@ -32,9 +32,9 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -253,8 +253,8 @@ public class StartDatafeedAction
return storedRequest.getDatafeedId().equals(request.getDatafeedId());
};
if (persistentTasksInProgress.entriesExist(NAME, predicate)) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STOPPED, DatafeedStatus.STARTED);
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
}
}
@ -284,9 +284,9 @@ public class StartDatafeedAction
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
Allocation allocation = mlMetadata.getAllocations().get(datafeed.getJobId());
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
if (allocation.getState() != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, allocation.getState());
}
DatafeedJobValidator.validate(datafeed, job);
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -158,8 +158,8 @@ public class StopDatafeedAction
}
}
}
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STARTED, DatafeedStatus.STOPPED));
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED));
}
@Override

View File

@ -24,7 +24,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
@ -78,9 +78,9 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
String jobId = jobIdFromRequest.apply(request);
jobManager.getJobOrThrowIfUnknown(jobId);
Allocation allocation = jobManager.getJobAllocation(jobId);
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("job [" + jobId + "] status is [" + allocation.getStatus() +
"], but must be [" + JobStatus.OPENED + "] to perform requested action", RestStatus.CONFLICT);
if (allocation.getState() != JobState.OPENED) {
throw new ElasticsearchStatusException("job [" + jobId + "] state is [" + allocation.getState() +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT);
} else {
throw new IllegalStateException("No errors or response");
}

View File

@ -25,22 +25,22 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class UpdateJobStatusAction
extends Action<UpdateJobStatusAction.Request, UpdateJobStatusAction.Response, UpdateJobStatusAction.RequestBuilder> {
public class UpdateJobStateAction
extends Action<UpdateJobStateAction.Request, UpdateJobStateAction.Response, UpdateJobStateAction.RequestBuilder> {
public static final UpdateJobStatusAction INSTANCE = new UpdateJobStatusAction();
public static final String NAME = "cluster:admin/ml/anomaly_detectors/status/update";
public static final UpdateJobStateAction INSTANCE = new UpdateJobStateAction();
public static final String NAME = "cluster:admin/ml/anomaly_detectors/state/update";
private UpdateJobStatusAction() {
private UpdateJobStateAction() {
super(NAME);
}
@ -57,12 +57,12 @@ public class UpdateJobStatusAction
public static class Request extends AcknowledgedRequest<Request> {
private String jobId;
private JobStatus status;
private JobState state;
private String reason;
public Request(String jobId, JobStatus status) {
public Request(String jobId, JobState state) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.status = ExceptionsHelper.requireNonNull(status, Allocation.STATUS.getPreferredName());
this.state = ExceptionsHelper.requireNonNull(state, Allocation.STATE.getPreferredName());
}
Request() {}
@ -75,12 +75,12 @@ public class UpdateJobStatusAction
this.jobId = jobId;
}
public JobStatus getStatus() {
return status;
public JobState getState() {
return state;
}
public void setStatus(JobStatus status) {
this.status = status;
public void setState(JobState state) {
this.state = state;
}
public String getReason() {
@ -100,7 +100,7 @@ public class UpdateJobStatusAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
status = JobStatus.fromStream(in);
state = JobState.fromStream(in);
reason = in.readOptionalString();
}
@ -108,13 +108,13 @@ public class UpdateJobStatusAction
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
status.writeTo(out);
state.writeTo(out);
out.writeOptionalString(reason);
}
@Override
public int hashCode() {
return Objects.hash(jobId, status);
return Objects.hash(jobId, state);
}
@Override
@ -125,14 +125,14 @@ public class UpdateJobStatusAction
if (obj == null || obj.getClass() != getClass()) {
return false;
}
UpdateJobStatusAction.Request other = (UpdateJobStatusAction.Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(status, other.status);
UpdateJobStateAction.Request other = (UpdateJobStateAction.Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(state, other.state);
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, UpdateJobStatusAction action) {
public RequestBuilder(ElasticsearchClient client, UpdateJobStateAction action) {
super(client, action, new Request());
}
}
@ -166,7 +166,7 @@ public class UpdateJobStatusAction
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, UpdateJobStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
super(settings, UpdateJobStateAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@ -183,7 +183,7 @@ public class UpdateJobStatusAction
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.setJobStatus(request, listener);
jobManager.setJobState(request, listener);
}
@Override

View File

@ -75,7 +75,7 @@ public class DatafeedJobRunner extends AbstractComponent {
}, handler);
}
// Important: Holder must be created and assigned to DatafeedTask before setting status to started,
// Important: Holder must be created and assigned to DatafeedTask before setting state to started,
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
private void innerRun(Holder holder, long startTime, Long endTime) {

View File

@ -12,18 +12,18 @@ import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Locale;
public enum DatafeedStatus implements Writeable {
public enum DatafeedState implements Writeable {
STARTED, STOPPED;
public static DatafeedStatus fromString(String name) {
public static DatafeedState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public static DatafeedStatus fromStream(StreamInput in) throws IOException {
public static DatafeedState fromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown public enum DatafeedStatus {\n ordinal [" + ordinal + "]");
throw new IOException("Unknown public enum DatafeedState ordinal [" + ordinal + "]");
}
return values()[ordinal];
}

View File

@ -21,11 +21,11 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.IgnoreDowntime;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
@ -214,9 +214,9 @@ public class JobManager extends AbstractComponent {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
logger.debug("Deleting job '" + jobId + "'");
// Step 3. Listen for the Cluster State status change
// Chain acknowledged status onto original actionListener
CheckedConsumer<Boolean, Exception> deleteStatusConsumer = jobDeleted -> {
// Step 3. Listen for the Cluster State job state change
// Chain acknowledged state onto original actionListener
CheckedConsumer<Boolean, Exception> deleteStateConsumer = jobDeleted -> {
if (jobDeleted) {
logger.info("Job [" + jobId + "] deleted.");
actionListener.onResponse(new DeleteJobAction.Response(true));
@ -228,10 +228,10 @@ public class JobManager extends AbstractComponent {
// Step 2. Listen for the Deleted Index response
// If successful, delete from cluster state and chain onto deleteStatusListener
// If successful, delete from cluster state and chain onto deleteStateListener
CheckedConsumer<Boolean, Exception> deleteIndexConsumer = response -> {
clusterService.submitStateUpdateTask("delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(deleteStatusConsumer, actionListener::onFailure)) {
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(deleteStateConsumer, actionListener::onFailure)) {
@Override
protected Boolean newResponse(boolean acknowledged) {
@ -249,12 +249,12 @@ public class JobManager extends AbstractComponent {
// Step 1. Update the CS to DELETING
// If successful, attempt to delete the physical index and chain
// onto deleteIndexConsumer
CheckedConsumer<UpdateJobStatusAction.Response, Exception> updateConsumer = response -> {
// Sucessfully updated the status to DELETING, begin actually deleting
CheckedConsumer<UpdateJobStateAction.Response, Exception> updateConsumer = response -> {
// Sucessfully updated the state to DELETING, begin actually deleting
if (response.isAcknowledged()) {
logger.info("Job [" + jobId + "] set to [" + JobStatus.DELETING + "]");
logger.info("Job [" + jobId + "] set to [" + JobState.DELETING + "]");
} else {
logger.warn("Job [" + jobId + "] change to [" + JobStatus.DELETING + "] was not acknowledged.");
logger.warn("Job [" + jobId + "] change to [" + JobState.DELETING + "] was not acknowledged.");
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
@ -282,8 +282,8 @@ public class JobManager extends AbstractComponent {
}));
};
UpdateJobStatusAction.Request updateStatusListener = new UpdateJobStatusAction.Request(jobId, JobStatus.DELETING);
setJobStatus(updateStatusListener, ActionListener.wrap(updateConsumer, actionListener::onFailure));
UpdateJobStateAction.Request updateStateListener = new UpdateJobStateAction.Request(jobId, JobState.DELETING);
setJobState(updateStateListener, ActionListener.wrap(updateConsumer, actionListener::onFailure));
}
@ -339,22 +339,22 @@ public class JobManager extends AbstractComponent {
});
}
public void setJobStatus(UpdateJobStatusAction.Request request, ActionListener<UpdateJobStatusAction.Response> actionListener) {
clusterService.submitStateUpdateTask("set-job-status-" + request.getStatus() + "-" + request.getJobId(),
new AckedClusterStateUpdateTask<UpdateJobStatusAction.Response>(request, actionListener) {
public void setJobState(UpdateJobStateAction.Request request, ActionListener<UpdateJobStateAction.Response> actionListener) {
clusterService.submitStateUpdateTask("set-job-state-" + request.getState() + "-" + request.getJobId(),
new AckedClusterStateUpdateTask<UpdateJobStateAction.Response>(request, actionListener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MlMetadata.TYPE));
builder.updateStatus(request.getJobId(), request.getStatus(), request.getReason());
builder.updateState(request.getJobId(), request.getState(), request.getReason());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).putCustom(MlMetadata.TYPE, builder.build()))
.build();
}
@Override
protected UpdateJobStatusAction.Response newResponse(boolean acknowledged) {
return new UpdateJobStatusAction.Response(acknowledged);
protected UpdateJobStateAction.Response newResponse(boolean acknowledged) {
return new UpdateJobStateAction.Response(acknowledged);
}
});
}

View File

@ -31,8 +31,8 @@ import java.util.TreeSet;
/**
* This class represents a configured and created Job. The creation time is set
* to the time the object was constructed, Status is set to
* {@link JobStatus#OPENING} and the finished time and last data time fields are
* to the time the object was constructed, state is set to
* {@link JobState#OPENING} and the finished time and last data time fields are
* {@code null} until the job has seen some data or it is finished respectively.
* If the job was created to read data from a list of files FileUrls will be a
* non-empty list else the expects data to be streamed to it.

View File

@ -15,21 +15,21 @@ import java.util.Locale;
/**
* Jobs whether running or complete are in one of these states.
* When a job is created it is initialised in to the status closed
* When a job is created it is initialised in to the state closed
* i.e. it is not running.
*/
public enum JobStatus implements Writeable {
public enum JobState implements Writeable {
CLOSING, CLOSED, OPENING, OPENED, FAILED, DELETING;
public static JobStatus fromString(String name) {
public static JobState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public static JobStatus fromStream(StreamInput in) throws IOException {
public static JobState fromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown public enum JobStatus {\n ordinal [" + ordinal + "]");
throw new IOException("Unknown public enum JobState {\n ordinal [" + ordinal + "]");
}
return values()[ordinal];
}
@ -40,9 +40,9 @@ public enum JobStatus implements Writeable {
}
/**
* @return {@code true} if status matches any of the given {@code candidates}
* @return {@code true} if state matches any of the given {@code candidates}
*/
public boolean isAnyOf(JobStatus... candidates) {
public boolean isAnyOf(JobState... candidates) {
return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
}
}

View File

@ -14,7 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import java.io.IOException;
import java.util.Objects;
@ -24,8 +24,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
private static final ParseField JOB_ID_FIELD = new ParseField("job_id");
private static final ParseField IGNORE_DOWNTIME_FIELD = new ParseField("ignore_downtime");
public static final ParseField STATUS = new ParseField("status");
public static final ParseField STATUS_REASON = new ParseField("status_reason");
public static final ParseField STATE = new ParseField("state");
public static final ParseField STATE_REASON = new ParseField("state_reason");
static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("allocation", Builder::new);
@ -33,30 +33,30 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
PARSER.declareString(Builder::setNodeId, NODE_ID_FIELD);
PARSER.declareString(Builder::setJobId, JOB_ID_FIELD);
PARSER.declareBoolean(Builder::setIgnoreDowntime, IGNORE_DOWNTIME_FIELD);
PARSER.declareField(Builder::setStatus, (p, c) -> JobStatus.fromString(p.text()), STATUS, ObjectParser.ValueType.STRING);
PARSER.declareString(Builder::setStatusReason, STATUS_REASON);
PARSER.declareField(Builder::setState, (p, c) -> JobState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING);
PARSER.declareString(Builder::setStateReason, STATE_REASON);
}
private final String nodeId;
private final String jobId;
private final boolean ignoreDowntime;
private final JobStatus status;
private final String statusReason;
private final JobState state;
private final String stateReason;
public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobStatus status, String statusReason) {
public Allocation(String nodeId, String jobId, boolean ignoreDowntime, JobState state, String stateReason) {
this.nodeId = nodeId;
this.jobId = jobId;
this.ignoreDowntime = ignoreDowntime;
this.status = status;
this.statusReason = statusReason;
this.state = state;
this.stateReason = stateReason;
}
public Allocation(StreamInput in) throws IOException {
this.nodeId = in.readOptionalString();
this.jobId = in.readString();
this.ignoreDowntime = in.readBoolean();
this.status = JobStatus.fromStream(in);
this.statusReason = in.readOptionalString();
this.state = JobState.fromStream(in);
this.stateReason = in.readOptionalString();
}
public String getNodeId() {
@ -70,18 +70,18 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
/**
* @return Whether to ignore downtime at startup.
*
* When the job status is set to STARTED, to ignoreDowntime will be set to false.
* When the job state is set to STARTED, to ignoreDowntime will be set to false.
*/
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public JobStatus getStatus() {
return status;
public JobState getState() {
return state;
}
public String getStatusReason() {
return statusReason;
public String getStateReason() {
return stateReason;
}
@Override
@ -89,8 +89,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
out.writeOptionalString(nodeId);
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
status.writeTo(out);
out.writeOptionalString(statusReason);
state.writeTo(out);
out.writeOptionalString(stateReason);
}
@Override
@ -101,9 +101,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
builder.field(JOB_ID_FIELD.getPreferredName(), jobId);
builder.field(IGNORE_DOWNTIME_FIELD.getPreferredName(), ignoreDowntime);
builder.field(STATUS.getPreferredName(), status);
if (statusReason != null) {
builder.field(STATUS_REASON.getPreferredName(), statusReason);
builder.field(STATE.getPreferredName(), state);
if (stateReason != null) {
builder.field(STATE_REASON.getPreferredName(), stateReason);
}
builder.endObject();
return builder;
@ -117,13 +117,13 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
return Objects.equals(nodeId, that.nodeId) &&
Objects.equals(jobId, that.jobId) &&
Objects.equals(ignoreDowntime, that.ignoreDowntime) &&
Objects.equals(status, that.status) &&
Objects.equals(statusReason, that.statusReason);
Objects.equals(state, that.state) &&
Objects.equals(stateReason, that.stateReason);
}
@Override
public int hashCode() {
return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason);
return Objects.hash(nodeId, jobId, ignoreDowntime, state, stateReason);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -137,8 +137,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private String nodeId;
private String jobId;
private boolean ignoreDowntime;
private JobStatus status;
private String statusReason;
private JobState state;
private String stateReason;
public Builder() {
}
@ -151,8 +151,8 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.nodeId = allocation.nodeId;
this.jobId = allocation.jobId;
this.ignoreDowntime = allocation.ignoreDowntime;
this.status = allocation.status;
this.statusReason = allocation.statusReason;
this.state = allocation.state;
this.stateReason = allocation.stateReason;
}
public void setNodeId(String nodeId) {
@ -168,19 +168,19 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
@SuppressWarnings("incomplete-switch")
public void setStatus(JobStatus newStatus) {
if (this.status != null) {
switch (newStatus) {
public void setState(JobState newState) {
if (this.state != null) {
switch (newState) {
case CLOSING:
if (this.status != JobStatus.OPENED) {
throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.OPENED
+ "], but got [" + status +"]");
if (this.state != JobState.OPENED) {
throw new IllegalArgumentException("[" + jobId + "] expected state [" + JobState.OPENED
+ "], but got [" + state +"]");
}
break;
case OPENING:
if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED) == false) {
throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.CLOSED
+ "] or [" + JobStatus.FAILED + "], but got [" + status +"]");
if (this.state.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw new IllegalArgumentException("[" + jobId + "] expected state [" + JobState.CLOSED
+ "] or [" + JobState.FAILED + "], but got [" + state +"]");
}
break;
case OPENED:
@ -189,15 +189,15 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
}
this.status = newStatus;
this.state = newState;
}
public void setStatusReason(String statusReason) {
this.statusReason = statusReason;
public void setStateReason(String stateReason) {
this.stateReason = stateReason;
}
public Allocation build() {
return new Allocation(nodeId, jobId, ignoreDowntime, status, statusReason);
return new Allocation(nodeId, jobId, ignoreDowntime, state, stateReason);
}
}

View File

@ -26,9 +26,9 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
@ -265,7 +265,7 @@ public class MlMetadata implements MetaData.Custom {
Allocation allocation = allocations.get(job.getId());
if (allocation == null) {
Allocation.Builder builder = new Allocation.Builder(job);
builder.setStatus(JobStatus.CLOSED);
builder.setState(JobState.CLOSED);
allocations.put(job.getId(), builder.build());
}
return this;
@ -286,9 +286,9 @@ public class MlMetadata implements MetaData.Custom {
Allocation previousAllocation = this.allocations.remove(jobId);
if (previousAllocation != null) {
if (!previousAllocation.getStatus().equals(JobStatus.DELETING)) {
if (!previousAllocation.getState().equals(JobState.DELETING)) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it is in ["
+ previousAllocation.getStatus() + "] state. Must be in [" + JobStatus.DELETING + "] state.");
+ previousAllocation.getState() + "] state. Must be in [" + JobState.DELETING + "] state.");
}
} else {
throw new ResourceNotFoundException("No Cluster State found for job [" + jobId + "]");
@ -329,7 +329,7 @@ public class MlMetadata implements MetaData.Custom {
};
if (persistentTasksInProgress.entriesExist(StartDatafeedAction.NAME, predicate)) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
DatafeedStatus.STARTED);
DatafeedState.STARTED);
throw ExceptionsHelper.conflictStatusException(msg);
}
}
@ -379,18 +379,18 @@ public class MlMetadata implements MetaData.Custom {
return this;
}
public Builder updateStatus(String jobId, JobStatus jobStatus, @Nullable String reason) {
public Builder updateState(String jobId, JobState jobState, @Nullable String reason) {
if (jobs.containsKey(jobId) == false) {
throw ExceptionsHelper.missingJobException(jobId);
}
Allocation previous = allocations.get(jobId);
if (previous == null) {
throw new IllegalStateException("[" + jobId + "] no allocation exist to update the status to [" + jobStatus + "]");
throw new IllegalStateException("[" + jobId + "] no allocation exist to update the state to [" + jobState + "]");
}
// Cannot update the status to DELETING if there are datafeeds attached
if (jobStatus.equals(JobStatus.DELETING)) {
// Cannot update the state to DELETING if there are datafeeds attached
if (jobState.equals(JobState.DELETING)) {
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
@ -398,22 +398,22 @@ public class MlMetadata implements MetaData.Custom {
}
}
if (previous.getStatus().equals(JobStatus.DELETING)) {
if (previous.getState().equals(JobState.DELETING)) {
// If we're already Deleting there's nothing to do
if (jobStatus.equals(JobStatus.DELETING)) {
if (jobState.equals(JobState.DELETING)) {
return this;
}
// Once a job goes into Deleting, it cannot be changed
throw new ElasticsearchStatusException("Cannot change status of job [" + jobId + "] to [" + jobStatus + "] because " +
"it is currently in [" + JobStatus.DELETING + "] status.", RestStatus.CONFLICT);
throw new ElasticsearchStatusException("Cannot change state of job [" + jobId + "] to [" + jobState + "] because " +
"it is currently in [" + JobState.DELETING + "] state.", RestStatus.CONFLICT);
}
Allocation.Builder builder = new Allocation.Builder(previous);
builder.setStatus(jobStatus);
builder.setState(jobState);
if (reason != null) {
builder.setStatusReason(reason);
builder.setStateReason(reason);
}
if (previous.getStatus() != jobStatus && jobStatus == JobStatus.CLOSED) {
if (previous.getState() != jobState && jobState == JobState.CLOSED) {
Job.Builder job = new Job.Builder(this.jobs.get(jobId));
job.setFinishedTime(new Date());
this.jobs.put(job.getId(), job.build());

View File

@ -252,8 +252,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
/**
* Report the the status now regardless of whether or
* not we are at a reporting boundary.
* Report the counts now regardless of whether or not we are at a reporting boundary.
*/
public void finishReporting() {
dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener());

View File

@ -16,10 +16,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -121,14 +121,14 @@ public class AutodetectProcessManager extends AbstractComponent {
*/
public DataCounts processData(String jobId, InputStream input, DataLoadParams params) {
Allocation allocation = jobManager.getJobAllocation(jobId);
if (allocation.getStatus() != JobStatus.OPENED) {
throw new IllegalArgumentException("job [" + jobId + "] status is [" + allocation.getStatus() + "], but must be ["
+ JobStatus.OPENED + "] for processing data");
if (allocation.getState() != JobState.OPENED) {
throw new IllegalArgumentException("job [" + jobId + "] state is [" + allocation.getState() + "], but must be ["
+ JobState.OPENED + "] for processing data");
}
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("job [" + jobId + "] with status [" + allocation.getStatus() + "] hasn't been started");
throw new IllegalStateException("job [" + jobId + "] with state [" + allocation.getState() + "] hasn't been started");
}
try {
return communicator.writeToJob(input, params);
@ -193,7 +193,7 @@ public class AutodetectProcessManager extends AbstractComponent {
logger.error(msg);
throw ExceptionsHelper.serverError(msg, ioe);
}
setJobStatus(jobId, JobStatus.OPENED);
setJobState(jobId, JobState.OPENED);
return communicator;
});
}, handler);
@ -290,7 +290,7 @@ public class AutodetectProcessManager extends AbstractComponent {
try {
communicator.close();
setJobStatus(jobId, JobStatus.CLOSED);
setJobState(jobId, JobState.CLOSED);
} catch (Exception e) {
logger.warn("Exception closing stopped process input stream", e);
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
@ -313,28 +313,28 @@ public class AutodetectProcessManager extends AbstractComponent {
return Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now());
}
private void setJobStatus(String jobId, JobStatus status) {
UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status);
client.execute(UpdateJobStatusAction.INSTANCE, request, new ActionListener<UpdateJobStatusAction.Response>() {
private void setJobState(String jobId, JobState state) {
UpdateJobStateAction.Request request = new UpdateJobStateAction.Request(jobId, state);
client.execute(UpdateJobStateAction.INSTANCE, request, new ActionListener<UpdateJobStateAction.Response>() {
@Override
public void onResponse(UpdateJobStatusAction.Response response) {
public void onResponse(UpdateJobStateAction.Response response) {
if (response.isAcknowledged()) {
logger.info("Successfully set job status to [{}] for job [{}]", status, jobId);
logger.info("Successfully set job state to [{}] for job [{}]", state, jobId);
} else {
logger.info("Changing job status to [{}] for job [{}] wasn't acked", status, jobId);
logger.info("Changing job state to [{}] for job [{}] wasn't acked", state, jobId);
}
}
@Override
public void onFailure(Exception e) {
logger.error("Could not set job status to [" + status + "] for job [" + jobId +"]", e);
logger.error("Could not set job state to [" + state + "] for job [" + jobId +"]", e);
}
});
}
public void setJobStatus(String jobId, JobStatus status, Consumer<Void> handler, Consumer<Exception> errorHandler) {
UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status);
client.execute(UpdateJobStatusAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler));
public void setJobState(String jobId, JobState state, Consumer<Void> handler, Consumer<Exception> errorHandler) {
UpdateJobStateAction.Request request = new UpdateJobStateAction.Request(jobId, state);
client.execute(UpdateJobStateAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler));
}
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(String jobId) {

View File

@ -12,29 +12,29 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class JobStatusObserver {
public class JobStateObserver {
private static final Logger LOGGER = Loggers.getLogger(JobStatusObserver.class);
private static final Logger LOGGER = Loggers.getLogger(JobStateObserver.class);
private final ThreadPool threadPool;
private final ClusterService clusterService;
public JobStatusObserver(ThreadPool threadPool, ClusterService clusterService) {
public JobStateObserver(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
}
public void waitForStatus(String jobId, TimeValue waitTimeout, JobStatus expectedStatus, Consumer<Exception> handler) {
public void waitForState(String jobId, TimeValue waitTimeout, JobState expectedState, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
JobStatusPredicate jobStatusPredicate = new JobStatusPredicate(jobId, expectedStatus);
JobStatePredicate jobStatePredicate = new JobStatePredicate(jobId, expectedState);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
@ -43,32 +43,32 @@ public class JobStatusObserver {
@Override
public void onClusterServiceClose() {
Exception e = new IllegalArgumentException("Cluster service closed while waiting for job status to change to ["
+ expectedStatus + "]");
Exception e = new IllegalArgumentException("Cluster service closed while waiting for job state to change to ["
+ expectedState + "]");
handler.accept(new IllegalStateException(e));
}
@Override
public void onTimeout(TimeValue timeout) {
if (jobStatusPredicate.test(clusterService.state())) {
if (jobStatePredicate.test(clusterService.state())) {
handler.accept(null);
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for job status to change to ["
+ expectedStatus + "]");
Exception e = new IllegalArgumentException("Timeout expired while waiting for job state to change to ["
+ expectedState + "]");
handler.accept(e);
}
}
}, jobStatusPredicate, waitTimeout);
}, jobStatePredicate, waitTimeout);
}
private static class JobStatusPredicate implements Predicate<ClusterState> {
private static class JobStatePredicate implements Predicate<ClusterState> {
private final String jobId;
private final JobStatus expectedStatus;
private final JobState expectedState;
JobStatusPredicate(String jobId, JobStatus expectedStatus) {
JobStatePredicate(String jobId, JobState expectedState) {
this.jobId = jobId;
this.expectedStatus = expectedStatus;
this.expectedState = expectedState;
}
@Override
@ -77,7 +77,7 @@ public class JobStatusObserver {
if (metadata != null) {
Allocation allocation = metadata.getAllocations().get(jobId);
if (allocation != null) {
return allocation.getStatus() == expectedStatus;
return allocation.getState() == expectedState;
}
}
return false;

View File

@ -23,12 +23,12 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -90,7 +90,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getStatus(), JobStatus.OPENED);
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data-*"));
@ -109,7 +109,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED));
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}
@ -130,7 +130,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getStatus(), JobStatus.OPENED);
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
@ -171,7 +171,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED));
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}
@ -255,7 +255,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
try {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response r = client.execute(GetDatafeedsStatsAction.INSTANCE, request).get();
assertThat(r.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED));
assertThat(r.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import java.util.ArrayList;
@ -24,9 +24,9 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
List<Response.DatafeedStats> datafeedStatsList = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
String datafeedId = randomAsciiOfLength(10);
DatafeedStatus datafeedStatus = randomFrom(DatafeedStatus.values());
DatafeedState datafeedState = randomFrom(DatafeedState.values());
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedStatus);
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState);
datafeedStatsList.add(datafeedStats);
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Response;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
@ -38,9 +38,9 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
}
JobStatus jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
JobState jobState = randomFrom(EnumSet.allOf(JobState.class));
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobStatus);
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState);
jobStatsList.add(jobStats);
}

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.util.Arrays;
@ -23,7 +23,7 @@ public class GetJobsStatsActionTests extends ESTestCase {
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED)));
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED)));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(
@ -35,23 +35,23 @@ public class GetJobsStatsActionTests extends ESTestCase {
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED))
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobStatus.CLOSED)
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobStatus.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobStatus.CLOSED)));
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED)));
assertEquals(0, result.size());
}

View File

@ -53,7 +53,7 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
.build();
e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job status [OPENED], but got [CLOSED]"));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [OPENED], but got [CLOSED]"));
}
}

View File

@ -5,15 +5,15 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction.Request;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction.Request;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
public class UpdateJobStatusRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(JobStatus.values()));
return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(JobState.values()));
}
@Override

View File

@ -26,7 +26,7 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -116,7 +116,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.updateStatus("foo", JobStatus.OPENED, null)
.updateState("foo", JobState.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
@ -155,7 +155,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.updateStatus("foo", JobStatus.OPENED, null)
.updateState("foo", JobState.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
@ -184,7 +184,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.updateStatus("foo", JobStatus.OPENED, null)
.updateState("foo", JobState.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))

View File

@ -10,13 +10,13 @@ import org.elasticsearch.test.ESTestCase;
public class DatafeedStatusTests extends ESTestCase {
public void testForString() {
assertEquals(DatafeedStatus.fromString("started"), DatafeedStatus.STARTED);
assertEquals(DatafeedStatus.fromString("stopped"), DatafeedStatus.STOPPED);
assertEquals(DatafeedState.fromString("started"), DatafeedState.STARTED);
assertEquals(DatafeedState.fromString("stopped"), DatafeedState.STOPPED);
}
public void testValidOrdinals() {
assertEquals(0, DatafeedStatus.STARTED.ordinal());
assertEquals(1, DatafeedStatus.STOPPED.ordinal());
assertEquals(0, DatafeedState.STARTED.ordinal());
assertEquals(1, DatafeedState.STOPPED.ordinal());
}
}

View File

@ -329,7 +329,7 @@ public class DatafeedJobIT extends ESRestTestCase {
try {
Response datafeedStatsResponse = client().performRequest("get",
MlPlugin.BASE_PATH + "datafeeds/" + datafeedId + "/_stats");
assertThat(responseEntityToString(datafeedStatsResponse), containsString("\"status\":\"STOPPED\""));
assertThat(responseEntityToString(datafeedStatsResponse), containsString("\"state\":\"STOPPED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
@ -75,7 +75,7 @@ public class TooManyJobsIT extends ESIntegTestCase {
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getStatus(), JobStatus.OPENED);
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
logger.info("Opened {}th job", i);
} catch (Exception e) {
@ -84,7 +84,7 @@ public class TooManyJobsIT extends ESIntegTestCase {
logger.warn("Unexpected cause", e);
}
assertEquals(IllegalArgumentException.class, cause.getClass());
assertEquals("Timeout expired while waiting for job status to change to [OPENED]", cause.getMessage());
assertEquals("Timeout expired while waiting for job state to change to [OPENED]", cause.getMessage());
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// now manually clean things up and see if we can succeed to run one new job
@ -97,7 +97,7 @@ public class TooManyJobsIT extends ESIntegTestCase {
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getStatus(), JobStatus.OPENED);
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
return;
}

View File

@ -10,30 +10,30 @@ import org.elasticsearch.test.ESTestCase;
public class JobStatusTests extends ESTestCase {
public void testForString() {
assertEquals(JobStatus.fromString("closed"), JobStatus.CLOSED);
assertEquals(JobStatus.fromString("closing"), JobStatus.CLOSING);
assertEquals(JobStatus.fromString("failed"), JobStatus.FAILED);
assertEquals(JobStatus.fromString("opening"), JobStatus.OPENING);
assertEquals(JobStatus.fromString("opened"), JobStatus.OPENED);
assertEquals(JobState.fromString("closed"), JobState.CLOSED);
assertEquals(JobState.fromString("closing"), JobState.CLOSING);
assertEquals(JobState.fromString("failed"), JobState.FAILED);
assertEquals(JobState.fromString("opening"), JobState.OPENING);
assertEquals(JobState.fromString("opened"), JobState.OPENED);
}
public void testValidOrdinals() {
assertEquals(0, JobStatus.CLOSING.ordinal());
assertEquals(1, JobStatus.CLOSED.ordinal());
assertEquals(2, JobStatus.OPENING.ordinal());
assertEquals(3, JobStatus.OPENED.ordinal());
assertEquals(4, JobStatus.FAILED.ordinal());
assertEquals(0, JobState.CLOSING.ordinal());
assertEquals(1, JobState.CLOSED.ordinal());
assertEquals(2, JobState.OPENING.ordinal());
assertEquals(3, JobState.OPENED.ordinal());
assertEquals(4, JobState.FAILED.ordinal());
}
public void testIsAnyOf() {
assertFalse(JobStatus.OPENED.isAnyOf());
assertFalse(JobStatus.OPENED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING, JobStatus.FAILED,
JobStatus.OPENING));
assertFalse(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSING, JobStatus.FAILED, JobStatus.OPENING, JobStatus.OPENED));
assertFalse(JobState.OPENED.isAnyOf());
assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.CLOSING, JobState.FAILED,
JobState.OPENING));
assertFalse(JobState.CLOSED.isAnyOf(JobState.CLOSING, JobState.FAILED, JobState.OPENING, JobState.OPENED));
assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED));
assertTrue(JobStatus.OPENED.isAnyOf(JobStatus.OPENED, JobStatus.CLOSED));
assertTrue(JobStatus.CLOSING.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING));
assertTrue(JobStatus.CLOSED.isAnyOf(JobStatus.CLOSED, JobStatus.CLOSING));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED));
assertTrue(JobState.CLOSING.isAnyOf(JobState.CLOSED, JobState.CLOSING));
assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED, JobState.CLOSING));
}
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.job.metadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
@ -17,9 +17,9 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
String nodeId = randomBoolean() ? randomAsciiOfLength(10) : null;
String jobId = randomAsciiOfLength(10);
boolean ignoreDowntime = randomBoolean();
JobStatus jobStatus = randomFrom(JobStatus.values());
String statusReason = randomBoolean() ? randomAsciiOfLength(10) : null;
return new Allocation(nodeId, jobId, ignoreDowntime, jobStatus, statusReason);
JobState jobState = randomFrom(JobState.values());
String stateReason = randomBoolean() ? randomAsciiOfLength(10) : null;
return new Allocation(nodeId, jobId, ignoreDowntime, jobState, stateReason);
}
@Override
@ -33,12 +33,11 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
}
public void testUnsetIgnoreDownTime() {
Allocation allocation = new Allocation("_node_id", "_job_id", true, JobStatus.OPENING, null);
Allocation allocation = new Allocation("_node_id", "_job_id", true, JobState.OPENING, null);
assertTrue(allocation.isIgnoreDowntime());
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setStatus(JobStatus.OPENED);
builder.setState(JobState.OPENED);
allocation = builder.build();
assertFalse(allocation.isIgnoreDowntime());
}
}

View File

@ -20,7 +20,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
@ -59,11 +59,11 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putJob(job, false);
}
if (randomBoolean()) {
builder.updateStatus(job.getId(), JobStatus.OPENING, randomBoolean() ? "first reason" : null);
builder.updateState(job.getId(), JobState.OPENING, randomBoolean() ? "first reason" : null);
if (randomBoolean()) {
builder.updateStatus(job.getId(), JobStatus.OPENED, randomBoolean() ? "second reason" : null);
builder.updateState(job.getId(), JobState.OPENED, randomBoolean() ? "second reason" : null);
if (randomBoolean()) {
builder.updateStatus(job.getId(), JobStatus.CLOSING, randomBoolean() ? "third reason" : null);
builder.updateState(job.getId(), JobState.CLOSING, randomBoolean() ? "third reason" : null);
}
}
}
@ -105,10 +105,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata result = builder.build();
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getAllocations().get("1").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("1").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("1"), nullValue());
assertThat(result.getJobs().get("2"), sameInstance(job2));
assertThat(result.getAllocations().get("2").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("2").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("2"), nullValue());
builder = new MlMetadata.Builder(result);
@ -132,13 +132,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata result = builder.build();
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getAllocations().get("1").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("1").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("1"), nullValue());
builder = new MlMetadata.Builder(result);
builder.updateStatus("1", JobStatus.DELETING, null);
builder.updateState("1", JobState.DELETING, null);
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getAllocations().get("1").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("1").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("1"), nullValue());
builder.deleteJob("1");
@ -152,12 +152,12 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
Job job1 = buildJobBuilder("1").build();
MlMetadata.Builder builder1 = new MlMetadata.Builder();
builder1.putJob(job1, false);
builder1.updateStatus("1", JobStatus.OPENING, null);
builder1.updateStatus("1", JobStatus.OPENED, null);
builder1.updateState("1", JobState.OPENING, null);
builder1.updateState("1", JobState.OPENED, null);
MlMetadata result = builder1.build();
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getAllocations().get("1").getStatus(), equalTo(JobStatus.OPENED));
assertThat(result.getAllocations().get("1").getState(), equalTo(JobState.OPENED));
assertThat(result.getDatafeeds().get("1"), nullValue());
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
@ -192,14 +192,14 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("foo").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyList()));
result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getAllocations().get("foo").getState(), equalTo(JobState.CLOSED));
assertThat(result.getDatafeeds().get("datafeed1"), nullValue());
}
@ -251,12 +251,12 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
builder.putDatafeed(datafeedConfig1);
builder.updateStatus("foo", JobStatus.OPENING, null);
builder.updateStatus("foo", JobStatus.OPENED, null);
builder.updateState("foo", JobState.OPENING, null);
builder.updateState("foo", JobState.OPENED, null);
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.OPENED));
assertThat(result.getAllocations().get("foo").getState(), equalTo(JobState.OPENED));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
@ -273,20 +273,20 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testUpdateAllocation_setFinishedTime() {
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(buildJobBuilder("my_job_id").build(), false);
builder.updateStatus("my_job_id", JobStatus.OPENING, null);
builder.updateState("my_job_id", JobState.OPENING, null);
builder.updateStatus("my_job_id", JobStatus.OPENED, null);
builder.updateState("my_job_id", JobState.OPENED, null);
MlMetadata mlMetadata = builder.build();
assertThat(mlMetadata.getJobs().get("my_job_id").getFinishedTime(), nullValue());
builder.updateStatus("my_job_id", JobStatus.CLOSED, null);
builder.updateState("my_job_id", JobState.CLOSED, null);
mlMetadata = builder.build();
assertThat(mlMetadata.getJobs().get("my_job_id").getFinishedTime(), notNullValue());
}
public void testUpdateStatus_failBecauseJobDoesNotExist() {
public void testUpdateState_failBecauseJobDoesNotExist() {
MlMetadata.Builder builder = new MlMetadata.Builder();
expectThrows(ResourceNotFoundException.class, () -> builder.updateStatus("missing-job", JobStatus.CLOSED, "for testting"));
expectThrows(ResourceNotFoundException.class, () -> builder.updateState("missing-job", JobState.CLOSED, "for testting"));
}
public void testSetIgnoreDowntime_failBecauseJobDoesNotExist() {

View File

@ -101,7 +101,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
// These are not reserved because they're analyzed strings, i.e. the
// same type as user-specified fields
overridden.add(Job.DESCRIPTION.getPreferredName());
overridden.add(Allocation.STATUS.getPreferredName());
overridden.add(Allocation.STATE.getPreferredName());
overridden.add(ModelSnapshot.DESCRIPTION.getPreferredName());
Set<String> expected = new HashSet<>();

View File

@ -13,14 +13,14 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStateAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -91,7 +91,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobResultsPersister = mock(JobResultsPersister.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithStatus(JobStatus.OPENED);
givenAllocationWithState(JobState.OPENED);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
@ -123,8 +123,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.openJob("foo", false, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.OPENED);
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
UpdateJobStateAction.Request expectedRequest = new UpdateJobStateAction.Request("foo", JobState.OPENED);
verify(client).execute(eq(UpdateJobStateAction.INSTANCE), eq(expectedRequest), any());
}
public void testOpenJob_exceedMaxNumJobs() {
@ -284,14 +284,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
}
public void testProcessData_GivenStatusNotStarted() throws IOException {
public void testProcessData_GivenStateNotOpened() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator);
Job job = createJobDetails("foo");
givenAllocationWithStatus(JobStatus.OPENED);
givenAllocationWithState(JobState.OPENED);
InputStream inputStream = createInputStream("");
manager.openJob("foo", false, e -> {});
@ -325,9 +323,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(autodetectProcess, times(1)).close();
}
private void givenAllocationWithStatus(JobStatus status) {
private void givenAllocationWithState(JobState state) {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setStatus(status);
allocation.setState(state);
when(jobManager.getJobAllocation("foo")).thenReturn(allocation.build());
}

View File

@ -66,13 +66,13 @@ setup:
xpack.ml.get_datafeed_stats:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.status: "STOPPED"}
- match: { datafeeds.0.state: "STOPPED"}
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: datafeed-2
- match: { datafeeds.0.datafeed_id: "datafeed-2"}
- match: { datafeeds.0.status: "STOPPED"}
- match: { datafeeds.0.state: "STOPPED"}
---
"Test explicit get all datafeed stats":
@ -82,9 +82,9 @@ setup:
datafeed_id: _all
- match: { count: 2 }
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.status: "STOPPED"}
- match: { datafeeds.0.state: "STOPPED"}
- match: { datafeeds.1.datafeed_id: "datafeed-2"}
- match: { datafeeds.1.status: "STOPPED"}
- match: { datafeeds.1.state: "STOPPED"}
---
"Test implicit get all datafeed stats":
@ -93,6 +93,6 @@ setup:
xpack.ml.get_datafeed_stats: {}
- match: { count: 2 }
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.status: "STOPPED"}
- match: { datafeeds.0.state: "STOPPED"}
- match: { datafeeds.1.datafeed_id: "datafeed-2"}
- match: { datafeeds.1.status: "STOPPED"}
- match: { datafeeds.1.state: "STOPPED"}

View File

@ -76,7 +76,7 @@ setup:
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.status: OPENED }
- match: { jobs.0.state: OPENED }
---
"Test get job stats for closed job":
@ -101,7 +101,7 @@ setup:
- match: { jobs.0.data_counts.processed_field_count: 4}
- match: { jobs.0.data_counts.input_field_count: 4 }
- gt: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.status: CLOSED }
- match: { jobs.0.state: CLOSED }
---
"Test get job stats of datafeed job that has not received and data":
@ -112,7 +112,7 @@ setup:
- match: { jobs.0.job_id : datafeed-job }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.status: OPENED }
- match: { jobs.0.state: OPENED }
---
"Test get all job stats explicitly":

View File

@ -68,7 +68,7 @@ setup:
- do:
xpack.ml.get_job_stats:
job_id: farequote
- match: { jobs.0.status: "CLOSED" }
- match: { jobs.0.state: "CLOSED" }
- do:
get:
@ -101,7 +101,7 @@ setup:
- do:
xpack.ml.get_job_stats:
job_id: farequote
- match: { jobs.0.status: "CLOSED" }
- match: { jobs.0.state: "CLOSED" }
---
"Test POST data with invalid parameters":

View File

@ -47,14 +47,14 @@ setup:
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.status: STARTED }
- match: { datafeeds.0.state: STARTED }
- do:
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.status: STOPPED }
- match: { datafeeds.0.state: STOPPED }
---
"Test start non existing datafeed":
- do:
@ -71,7 +71,7 @@ setup:
"datafeed_id": "datafeed-1"
"start": 0
- do:
catch: /cannot start datafeed, expected job status \[OPENED\], but got \[CLOSED\]/
catch: /cannot start datafeed, expected job state \[OPENED\], but got \[CLOSED\]/
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
@ -92,7 +92,7 @@ setup:
"start": 0
- do:
catch: /datafeed already started, expected datafeed status \[STOPPED\], but got \[STARTED\]/
catch: /datafeed already started, expected datafeed state \[STOPPED\], but got \[STARTED\]/
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
@ -111,6 +111,6 @@ setup:
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
- do:
catch: /datafeed already stopped, expected datafeed status \[STARTED\], but got \[STOPPED\]/
catch: /datafeed already stopped, expected datafeed state \[STARTED\], but got \[STOPPED\]/
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"