diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index a9be600f8fc..cf98817990d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -25,16 +25,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; -import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; -import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.PersistentActionCoordinator; -import org.elasticsearch.xpack.persistent.PersistentActionRegistry; -import org.elasticsearch.xpack.persistent.PersistentActionService; -import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; @@ -61,7 +51,6 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.InternalOpenJobAction; -import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; @@ -70,17 +59,16 @@ import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; -import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; +import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.metadata.MlInitializationService; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeController; @@ -120,7 +108,17 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetCategoriesAction; import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; +import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; +import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.xpack.persistent.PersistentActionCoordinator; +import org.elasticsearch.xpack.persistent.PersistentActionRegistry; +import org.elasticsearch.xpack.persistent.PersistentActionRequest; +import org.elasticsearch.xpack.persistent.PersistentActionService; +import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import java.io.IOException; import java.nio.file.Path; @@ -180,7 +178,8 @@ public class MlPlugin extends Plugin implements ActionPlugin { new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class, PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new), new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), - new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom) + new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new) ); } @@ -299,7 +298,6 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class), new ActionHandler<>(InternalOpenJobAction.INSTANCE, InternalOpenJobAction.TransportAction.class), new ActionHandler<>(UpdateJobStatusAction.INSTANCE, UpdateJobStatusAction.TransportAction.class), - new ActionHandler<>(UpdateDatafeedStatusAction.INSTANCE, UpdateDatafeedStatusAction.TransportAction.class), new ActionHandler<>(GetFiltersAction.INSTANCE, GetFiltersAction.TransportAction.class), new ActionHandler<>(PutFilterAction.INSTANCE, PutFilterAction.TransportAction.class), new ActionHandler<>(DeleteFilterAction.INSTANCE, DeleteFilterAction.TransportAction.class), @@ -320,7 +318,6 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class), new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), - new ActionHandler<>(InternalStartDatafeedAction.INSTANCE, InternalStartDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteDatafeedAction.java index 5504e030d9e..92d589e6a46 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/DeleteDatafeedAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; import java.util.Objects; @@ -169,8 +170,9 @@ public class DeleteDatafeedAction extends Action { @@ -198,18 +197,17 @@ public class GetDatafeedsAction extends Action listener) throws Exception { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - QueryPage response = null; + QueryPage response; MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); if (ALL.equals(request.getDatafeedId())) { - List datafeedConfigs = mlMetadata.getDatafeeds().values().stream().map( - s -> s.getConfig()).collect(Collectors.toList()); - response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), Datafeed.RESULTS_FIELD); + List datafeedConfigs = new ArrayList<>(mlMetadata.getDatafeeds().values()); + response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), DatafeedConfig.RESULTS_FIELD); } else { - Datafeed datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); + DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); } - response = new QueryPage<>(Collections.singletonList(datafeed.getConfig()), 1, Datafeed.RESULTS_FIELD); + response = new QueryPage<>(Collections.singletonList(datafeed), 1, DatafeedConfig.RESULTS_FIELD); } listener.onResponse(new Response(response)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java index 4530f3164e6..71db8316e43 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java @@ -31,17 +31,21 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.action.util.QueryPage; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; public class GetDatafeedsStatsAction extends Action { @@ -240,7 +244,8 @@ public class GetDatafeedsStatsAction extends Action listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); + Map statuses = new HashMap<>(); + PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE); + if (tasksInProgress != null) { + Predicate> predicate = ALL.equals(request.getDatafeedId()) ? p -> true : + p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId()); + for (PersistentTaskInProgress taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, predicate)) { + StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest(); + statuses.put(storedRequest.getDatafeedId(), DatafeedStatus.STARTED); + } + } + List stats = new ArrayList<>(); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); if (ALL.equals(request.getDatafeedId())) { - Collection datafeeds = mlMetadata.getDatafeeds().values(); - for (Datafeed datafeed : datafeeds) { - stats.add(new Response.DatafeedStats(datafeed.getId(), datafeed.getStatus())); + Collection datafeeds = mlMetadata.getDatafeeds().values(); + for (DatafeedConfig datafeed : datafeeds) { + DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED); + stats.add(new Response.DatafeedStats(datafeed.getId(), status)); } } else { - Datafeed datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); + DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); } - stats.add(new Response.DatafeedStats(datafeed.getId(), datafeed.getStatus())); + DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED); + stats.add(new Response.DatafeedStats(datafeed.getId(), status)); } - - QueryPage statsPage = new QueryPage<>(stats, stats.size(), Datafeed.RESULTS_FIELD); + QueryPage statsPage = new QueryPage<>(stats, stats.size(), DatafeedConfig.RESULTS_FIELD); listener.onResponse(new Response(statsPage)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java deleted file mode 100644 index 1adbb7f8912..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartDatafeedAction.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; - -public class InternalStartDatafeedAction extends - Action { - - public static final InternalStartDatafeedAction INSTANCE = new InternalStartDatafeedAction(); - public static final String NAME = "cluster:admin/ml/datafeeds/internal_start"; - - private InternalStartDatafeedAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends StartDatafeedAction.Request { - - Request(String datafeedId, long startTime) { - super(datafeedId, startTime); - } - - Request() { - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new DatafeedTask(id, type, action, parentTaskId, getDatafeedId()); - } - } - - static class RequestBuilder extends ActionRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, InternalStartDatafeedAction action) { - super(client, action, new Request()); - } - } - - public static class Response extends ActionResponse { - - Response() { - } - - } - - public static class DatafeedTask extends CancellableTask { - - private volatile DatafeedJobRunner.Holder holder; - - public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) { - super(id, type, action, "datafeed-" + datafeedId, parentTaskId); - } - - public void setHolder(DatafeedJobRunner.Holder holder) { - this.holder = holder; - } - - @Override - public boolean shouldCancelChildrenOnCancellation() { - return true; - } - - @Override - protected void onCancelled() { - stop(); - } - - /* public for testing */ - public void stop() { - if (holder == null) { - throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder"); - } - holder.stop("cancel", null); - } - } - - public static class TransportAction extends HandledTransportAction { - - private final DatafeedJobRunner datafeedJobRunner; - - @Inject - public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - DatafeedJobRunner datafeedJobRunner) { - super(settings, InternalStartDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, - Request::new); - this.datafeedJobRunner = datafeedJobRunner; - } - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - DatafeedTask datafeedTask = (DatafeedTask) task; - datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), datafeedTask, (error) -> { - if (error != null) { - listener.onFailure(error); - } else { - listener.onResponse(new Response()); - } - }); - } - - @Override - protected void doExecute(Request request, ActionListener listener) { - throw new UnsupportedOperationException("the task parameter is required"); - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 43bd18954ea..e7f091c48fd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -5,43 +5,53 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.LoggingTaskListener; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobStatus; +import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.utils.DatafeedStatusObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentActionRegistry; +import org.elasticsearch.xpack.persistent.PersistentActionRequest; +import org.elasticsearch.xpack.persistent.PersistentActionResponse; +import org.elasticsearch.xpack.persistent.PersistentActionService; +import org.elasticsearch.xpack.persistent.PersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; import java.util.Objects; +import java.util.function.Predicate; public class StartDatafeedAction - extends Action { + extends Action { public static final ParseField START_TIME = new ParseField("start"); public static final ParseField END_TIME = new ParseField("end"); @@ -60,11 +70,11 @@ public class StartDatafeedAction } @Override - public Response newResponse() { - return new Response(); + public PersistentActionResponse newResponse() { + return new PersistentActionResponse(); } - public static class Request extends ActionRequest implements ToXContent { + public static class Request extends PersistentActionRequest implements ToXContent { public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -72,8 +82,6 @@ public class StartDatafeedAction PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); PARSER.declareLong(Request::setEndTime, END_TIME); - PARSER.declareString((request, val) -> request.setStartTimeout(TimeValue.parseTimeValue(val, - START_TIME.getPreferredName())), START_TIMEOUT); } public static Request parseRequest(String datafeedId, XContentParser parser) { @@ -87,13 +95,16 @@ public class StartDatafeedAction private String datafeedId; private long startTime; private Long endTime; - private TimeValue startTimeout = TimeValue.timeValueSeconds(20); public Request(String datafeedId, long startTime) { this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); this.startTime = startTime; } + public Request(StreamInput in) throws IOException { + readFrom(in); + } + Request() { } @@ -113,26 +124,22 @@ public class StartDatafeedAction this.endTime = endTime; } - public TimeValue getStartTimeout() { - return startTimeout; - } - - public void setStartTimeout(TimeValue startTimeout) { - this.startTimeout = startTimeout; - } - @Override public ActionRequestValidationException validate() { return null; } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new DatafeedTask(id, type, action, parentTaskId, datafeedId); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); startTime = in.readVLong(); endTime = in.readOptionalLong(); - startTimeout = new TimeValue(in.readVLong()); } @Override @@ -141,7 +148,11 @@ public class StartDatafeedAction out.writeString(datafeedId); out.writeVLong(startTime); out.writeOptionalLong(endTime); - out.writeVLong(startTimeout.millis()); + } + + @Override + public String getWriteableName() { + return NAME; } @Override @@ -152,16 +163,13 @@ public class StartDatafeedAction if (endTime != null) { builder.field(END_TIME.getPreferredName(), endTime); } - if (startTimeout != null) { - builder.field(START_TIMEOUT.getPreferredName(), startTimeout.getStringRep()); - } builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, startTimeout); + return Objects.hash(datafeedId, startTime, endTime); } @Override @@ -175,90 +183,111 @@ public class StartDatafeedAction Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(startTime, other.startTime) && - Objects.equals(endTime, other.endTime) && - Objects.equals(startTimeout, other.startTimeout); + Objects.equals(endTime, other.endTime); } } - static class RequestBuilder extends ActionRequestBuilder { + static class RequestBuilder extends ActionRequestBuilder { public RequestBuilder(ElasticsearchClient client, StartDatafeedAction action) { super(client, action, new Request()); } } - public static class Response extends ActionResponse implements ToXContentObject { + public static class DatafeedTask extends PersistentTask { - private boolean started; + private volatile DatafeedJobRunner.Holder holder; - Response(boolean started) { - this.started = started; + public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) { + super(id, type, action, "datafeed-" + datafeedId, parentTaskId); } - Response() { - } - - public boolean isStarted() { - return started; + public void setHolder(DatafeedJobRunner.Holder holder) { + this.holder = holder; } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - started = in.readBoolean(); + public boolean shouldCancelChildrenOnCancellation() { + return true; } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(started); + protected void onCancelled() { + stop(); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("started", started); - builder.endObject(); - return builder; + /* public for testing */ + public void stop() { + if (holder == null) { + throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder"); + } + holder.stop("cancel", null); } } - public static class TransportAction extends HandledTransportAction { + public static class TransportAction extends TransportPersistentAction { - private final ClusterService clusterService; - private final DatafeedStatusObserver datafeedStatusObserver; - private final InternalStartDatafeedAction.TransportAction transportAction; + private final DatafeedJobRunner datafeedJobRunner; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, + PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, InternalStartDatafeedAction.TransportAction transportAction) { - super(settings, StartDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, - Request::new); - this.clusterService = clusterService; - this.datafeedStatusObserver = new DatafeedStatusObserver(threadPool, clusterService); - this.transportAction = transportAction; + DatafeedJobRunner datafeedJobRunner) { + super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry, + actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); + this.datafeedJobRunner = datafeedJobRunner; } @Override - protected void doExecute(Request request, ActionListener listener) { - // This validation happens also in DatafeedJobRunner, the reason we do it here too is that if it fails there - // we are unable to provide the user immediate feedback. We would create the task and the validation would fail - // in the background, whereas now the validation failure is part of the response being returned. - MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - DatafeedJobRunner.validate(request.datafeedId, mlMetadata); + public void validate(Request request, ClusterState clusterState) { + MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); + StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata); + PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE); + if (persistentTasksInProgress == null) { + return; + } - InternalStartDatafeedAction.Request internalRequest = - new InternalStartDatafeedAction.Request(request.datafeedId, request.startTime); - internalRequest.setEndTime(request.endTime); - transportAction.execute(internalRequest, LoggingTaskListener.instance()); - datafeedStatusObserver.waitForStatus(request.datafeedId, request.startTimeout, DatafeedStatus.STARTED, e -> { - if (e != null) { - listener.onFailure(e); - } else { - listener.onResponse(new Response(true)); - } - }); + Predicate> predicate = taskInProgress -> { + Request storedRequest = (Request) taskInProgress.getRequest(); + return storedRequest.getDatafeedId().equals(request.getDatafeedId()); + }; + if (persistentTasksInProgress.entriesExist(NAME, predicate)) { + throw new ElasticsearchStatusException("datafeed already started, expected datafeed status [{}], but got [{}]", + RestStatus.CONFLICT, DatafeedStatus.STOPPED, DatafeedStatus.STARTED); + } } + + @Override + protected void nodeOperation(PersistentTask task, Request request, ActionListener listener) { + DatafeedTask datafeedTask = (DatafeedTask) task; + datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), + datafeedTask, + (error) -> { + if (error != null) { + listener.onFailure(error); + } else { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + }); + } + + } + + public static void validate(String datafeedId, MlMetadata mlMetadata) { + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); + if (datafeed == null) { + throw ExceptionsHelper.missingDatafeedException(datafeedId); + } + Job job = mlMetadata.getJobs().get(datafeed.getJobId()); + if (job == null) { + throw ExceptionsHelper.missingJobException(datafeed.getJobId()); + } + Allocation allocation = mlMetadata.getAllocations().get(datafeed.getJobId()); + if (allocation.getStatus() != JobStatus.OPENED) { + throw new ElasticsearchStatusException("cannot start datafeed, expected job status [{}], but got [{}]", + RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus()); + } + DatafeedJobValidator.validate(datafeed, job); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 72696ef31ca..edd5e2ac243 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -9,43 +9,38 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.utils.DatafeedStatusObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; import java.util.Objects; public class StopDatafeedAction - extends Action { + extends Action { public static final StopDatafeedAction INSTANCE = new StopDatafeedAction(); public static final String NAME = "cluster:admin/ml/datafeeds/stop"; @@ -60,14 +55,13 @@ public class StopDatafeedAction } @Override - public Response newResponse() { - return new Response(); + public RemovePersistentTaskAction.Response newResponse() { + return new RemovePersistentTaskAction.Response(); } - public static class Request extends ActionRequest { + public static class Request extends MasterNodeRequest { private String datafeedId; - private TimeValue stopTimeout = TimeValue.timeValueSeconds(20); public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); @@ -80,10 +74,6 @@ public class StopDatafeedAction return datafeedId; } - public void setStopTimeout(TimeValue stopTimeout) { - this.stopTimeout = stopTimeout; - } - @Override public ActionRequestValidationException validate() { return null; @@ -93,19 +83,17 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); - stopTimeout = TimeValue.timeValueMillis(in.readVLong()); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); - out.writeVLong(stopTimeout.millis()); } @Override public int hashCode() { - return Objects.hash(datafeedId, stopTimeout); + return Objects.hash(datafeedId); } @Override @@ -117,114 +105,74 @@ public class StopDatafeedAction return false; } Request other = (Request) obj; - return Objects.equals(datafeedId, other.datafeedId) && - Objects.equals(stopTimeout, other.stopTimeout); + return Objects.equals(datafeedId, other.datafeedId); } } - static class RequestBuilder extends ActionRequestBuilder { + static class RequestBuilder extends ActionRequestBuilder { public RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) { super(client, action, new Request()); } } - public static class Response extends AcknowledgedResponse { + public static class TransportAction extends TransportMasterNodeAction { - private Response() { - super(true); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - } - - public static class TransportAction extends HandledTransportAction { - - private final ClusterService clusterService; - private final TransportListTasksAction listTasksAction; - private final TransportCancelTasksAction cancelTasksAction; - private final DatafeedStatusObserver datafeedStatusObserver; + private final RemovePersistentTaskAction.TransportAction removePersistentTaskAction; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, TransportCancelTasksAction cancelTasksAction, - TransportListTasksAction listTasksAction) { - super(settings, StopDatafeedAction.NAME, threadPool, transportService, actionFilters, + ClusterService clusterService, RemovePersistentTaskAction.TransportAction removePersistentTaskAction) { + super(settings, StopDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); - this.clusterService = clusterService; - this.listTasksAction = listTasksAction; - this.cancelTasksAction = cancelTasksAction; - this.datafeedStatusObserver = new DatafeedStatusObserver(threadPool, clusterService); + this.removePersistentTaskAction = removePersistentTaskAction; } @Override - protected void doExecute(Request request, ActionListener listener) { - String datafeedId = request.getDatafeedId(); - MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - validate(datafeedId, mlMetadata); - - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setActions(InternalStartDatafeedAction.NAME); - listTasksRequest.setDetailed(true); - listTasksAction.execute(listTasksRequest, new ActionListener() { - @Override - public void onResponse(ListTasksResponse listTasksResponse) { - String expectedJobDescription = "datafeed-" + datafeedId; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedJobDescription.equals(taskInfo.getDescription())) { - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(taskInfo.getTaskId()); - cancelTasksAction.execute(cancelTasksRequest, new ActionListener() { - @Override - public void onResponse(CancelTasksResponse cancelTasksResponse) { - datafeedStatusObserver.waitForStatus(datafeedId, request.stopTimeout, DatafeedStatus.STOPPED, e -> { - if (e != null) { - listener.onFailure(e); - } else { - listener.onResponse(new Response()); - } - }); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - return; - } - } - listener.onFailure(new ResourceNotFoundException("No datafeed [" + datafeedId + "] running")); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + protected String executor() { + return ThreadPool.Names.SAME; } + @Override + protected RemovePersistentTaskAction.Response newResponse() { + return new RemovePersistentTaskAction.Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { + String datafeedId = request.getDatafeedId(); + MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); + validate(datafeedId, mlMetadata); + + PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE); + if (tasksInProgress != null) { + for (PersistentTaskInProgress taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, p -> true)) { + StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest(); + if (storedRequest.getDatafeedId().equals(datafeedId)) { + RemovePersistentTaskAction.Request cancelTasksRequest = new RemovePersistentTaskAction.Request(); + cancelTasksRequest.setTaskId(taskInProgress.getId()); + removePersistentTaskAction.execute(cancelTasksRequest, listener); + return; + } + } + } + listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed status [{}], but got [{}]", + RestStatus.CONFLICT, DatafeedStatus.STARTED, DatafeedStatus.STOPPED)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } } static void validate(String datafeedId, MlMetadata mlMetadata) { - Datafeed datafeed = mlMetadata.getDatafeed(datafeedId); + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); } - - if (datafeed.getStatus() == DatafeedStatus.STOPPED) { - throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed status [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedStatus.STARTED, datafeed.getStatus()); - } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusAction.java deleted file mode 100644 index 570453c996e..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusAction.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; -import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; - -import java.io.IOException; -import java.util.Objects; - -public class UpdateDatafeedStatusAction extends Action { - - public static final UpdateDatafeedStatusAction INSTANCE = new UpdateDatafeedStatusAction(); - public static final String NAME = "cluster:admin/ml/datafeeds/status/update"; - - private UpdateDatafeedStatusAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends AcknowledgedRequest { - - private String datafeedId; - private DatafeedStatus datafeedStatus; - - public Request(String datafeedId, DatafeedStatus datafeedStatus) { - this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); - this.datafeedStatus = ExceptionsHelper.requireNonNull(datafeedStatus, "status"); - } - - Request() {} - - public String getDatafeedId() { - return datafeedId; - } - - public void setDatafeedId(String datafeedId) { - this.datafeedId = datafeedId; - } - - public DatafeedStatus getDatafeedStatus() { - return datafeedStatus; - } - - public void setDatafeedStatus(DatafeedStatus datafeedStatus) { - this.datafeedStatus = datafeedStatus; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - datafeedId = in.readString(); - datafeedStatus = DatafeedStatus.fromStream(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(datafeedId); - datafeedStatus.writeTo(out); - } - - @Override - public int hashCode() { - return Objects.hash(datafeedId, datafeedStatus); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || obj.getClass() != getClass()) { - return false; - } - UpdateDatafeedStatusAction.Request other = (UpdateDatafeedStatusAction.Request) obj; - return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(datafeedStatus, other.datafeedStatus); - } - - @Override - public String toString() { - return "Request{" + - DatafeedConfig.ID.getPreferredName() + "='" + datafeedId + "', " + - "status=" + datafeedStatus + - '}'; - } - } - - static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, UpdateDatafeedStatusAction action) { - super(client, action, new Request()); - } - } - - public static class Response extends AcknowledgedResponse { - - public Response(boolean acknowledged) { - super(acknowledged); - } - - public Response() {} - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - readAcknowledged(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - writeAcknowledged(out); - } - } - - public static class TransportAction extends TransportMasterNodeAction { - - private final JobManager jobManager; - - @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager) { - super(settings, UpdateDatafeedStatusAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); - this.jobManager = jobManager; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - jobManager.updateDatafeedStatus(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/Datafeed.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/Datafeed.java deleted file mode 100644 index 0771f887c0a..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/Datafeed.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.cluster.AbstractDiffable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import java.io.IOException; -import java.util.Objects; - -public class Datafeed extends AbstractDiffable implements ToXContent { - - private static final ParseField CONFIG_FIELD = new ParseField("config"); - private static final ParseField STATUS_FIELD = new ParseField("status"); - - // Used for QueryPage - public static final ParseField RESULTS_FIELD = new ParseField("datafeeds"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("datafeed", - a -> new Datafeed(((DatafeedConfig.Builder) a[0]).build(), (DatafeedStatus) a[1])); - - static { - PARSER.declareObject(ConstructingObjectParser.constructorArg(), DatafeedConfig.PARSER, CONFIG_FIELD); - PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> DatafeedStatus.fromString(p.text()), STATUS_FIELD, - ObjectParser.ValueType.STRING); - } - - private final DatafeedConfig config; - private final DatafeedStatus status; - - public Datafeed(DatafeedConfig config, DatafeedStatus status) { - this.config = config; - this.status = status; - } - - public Datafeed(StreamInput in) throws IOException { - this.config = new DatafeedConfig(in); - this.status = DatafeedStatus.fromStream(in); - } - - public String getId() { - return config.getId(); - } - - public String getJobId() { - return config.getJobId(); - } - - public DatafeedConfig getConfig() { - return config; - } - - public DatafeedStatus getStatus() { - return status; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - config.writeTo(out); - status.writeTo(out); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(CONFIG_FIELD.getPreferredName(), config); - builder.field(STATUS_FIELD.getPreferredName(), status); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Datafeed that = (Datafeed) o; - return Objects.equals(config, that.config) && - Objects.equals(status, that.status); - } - - @Override - public int hashCode() { - return Objects.hash(config, status); - } - - // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() - @Override - public final String toString() { - return Strings.toString(this); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index ed9a67e8ee3..17b306c2c08 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -6,12 +6,13 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; @@ -39,7 +40,10 @@ import java.util.Objects; * used around integral types and booleans so they can take null * values. */ -public class DatafeedConfig extends ToXContentToBytes implements Writeable { +public class DatafeedConfig extends AbstractDiffable implements ToXContent { + + // Used for QueryPage + public static final ParseField RESULTS_FIELD = new ParseField("datafeeds"); /** * The field name used to specify aggregation fields in Elasticsearch @@ -309,6 +313,11 @@ public class DatafeedConfig extends ToXContentToBytes implements Writeable { return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source); } + @Override + public String toString() { + return Strings.toString(this); + } + public static class Builder { private static final int DEFAULT_SCROLL_SIZE = 1000; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index c5e674dbd89..628075c692d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -5,20 +5,16 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; -import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; -import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; @@ -26,15 +22,12 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorF import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.JobStatus; -import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.time.Duration; import java.util.Collections; @@ -62,12 +55,10 @@ public class DatafeedJobRunner extends AbstractComponent { this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - public void run(String datafeedId, long startTime, Long endTime, InternalStartDatafeedAction.DatafeedTask task, + public void run(String datafeedId, long startTime, Long endTime, StartDatafeedAction.DatafeedTask task, Consumer handler) { MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - validate(datafeedId, mlMetadata); - - Datafeed datafeed = mlMetadata.getDatafeed(datafeedId); + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); gatherInformation(job.getId(), (buckets, dataCounts) -> { long latestFinalBucketEndMs = -1L; @@ -88,43 +79,36 @@ public class DatafeedJobRunner extends AbstractComponent { // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. private void innerRun(Holder holder, long startTime, Long endTime) { - setJobDatafeedStatus(holder.datafeed.getId(), DatafeedStatus.STARTED, error -> { - if (error != null) { - holder.stop("unable_to_set_datafeed_status", error); + logger.info("Starting datafeed [{}] for job [{}]", holder.datafeed.getId(), holder.datafeed.getJobId()); + holder.future = threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> { + Long next = null; + try { + next = holder.datafeedJob.runLookBack(startTime, endTime); + } catch (DatafeedJob.ExtractionProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } + holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); + } catch (DatafeedJob.AnalysisProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } + holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); + } catch (DatafeedJob.EmptyDataCountException e) { + if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { + next = e.nextDelayInMsSinceEpoch; + } + } catch (Exception e) { + logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); + holder.stop("general_lookback_failure", e); return; } - - logger.info("Starting datafeed [{}] for job [{}]", holder.datafeed.getId(), holder.datafeed.getJobId()); - holder.future = threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> { - Long next = null; - try { - next = holder.datafeedJob.runLookBack(startTime, endTime); - } catch (DatafeedJob.ExtractionProblemException e) { - if (endTime == null) { - next = e.nextDelayInMsSinceEpoch; - } - holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); - } catch (DatafeedJob.AnalysisProblemException e) { - if (endTime == null) { - next = e.nextDelayInMsSinceEpoch; - } - holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); - } catch (DatafeedJob.EmptyDataCountException e) { - if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { - next = e.nextDelayInMsSinceEpoch; - } - } catch (Exception e) { - logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); - holder.stop("general_lookback_failure", e); - return; - } - if (next != null) { - doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); - } else { - holder.stop("no_realtime", null); - holder.problemTracker.finishReport(); - } - }); + if (next != null) { + doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); + } else { + holder.stop("no_realtime", null); + holder.problemTracker.finishReport(); + } }); } @@ -160,37 +144,12 @@ public class DatafeedJobRunner extends AbstractComponent { } } - public static void validate(String datafeedId, MlMetadata mlMetadata) { - Datafeed datafeed = mlMetadata.getDatafeed(datafeedId); - if (datafeed == null) { - throw ExceptionsHelper.missingDatafeedException(datafeedId); - } - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - if (job == null) { - throw ExceptionsHelper.missingJobException(datafeed.getJobId()); - } - - Allocation allocation = mlMetadata.getAllocations().get(datafeed.getJobId()); - if (allocation.getStatus() != JobStatus.OPENED) { - throw new ElasticsearchStatusException("cannot start datafeed, expected job status [{}], but got [{}]", - RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus()); - } - - DatafeedStatus status = datafeed.getStatus(); - if (status != DatafeedStatus.STOPPED) { - throw new ElasticsearchStatusException("datafeed already started, expected datafeed status [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedStatus.STOPPED, status); - } - - DatafeedJobValidator.validate(datafeed.getConfig(), job); - } - - private Holder createJobDatafeed(Datafeed datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, - Consumer handler, InternalStartDatafeedAction.DatafeedTask task) { + private Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, + Consumer handler, StartDatafeedAction.DatafeedTask task) { Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(datafeed, job); - Duration queryDelay = Duration.ofSeconds(datafeed.getConfig().getQueryDelay()); - DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed.getConfig(), job); + Duration queryDelay = Duration.ofSeconds(datafeed.getQueryDelay()); + DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); Holder holder = new Holder(datafeed, datafeedJob, new ProblemTracker(() -> auditor), handler); @@ -231,8 +190,8 @@ public class DatafeedJobRunner extends AbstractComponent { }); } - private static Duration getFrequencyOrDefault(Datafeed datafeed, Job job) { - Long frequency = datafeed.getConfig().getFrequency(); + private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { + Long frequency = datafeed.getFrequency(); Long bucketSpan = job.getAnalysisConfig().getBucketSpan(); return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan) : Duration.ofSeconds(frequency); } @@ -241,36 +200,15 @@ public class DatafeedJobRunner extends AbstractComponent { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } - private void setJobDatafeedStatus(String datafeedId, DatafeedStatus status, Consumer handler) { - UpdateDatafeedStatusAction.Request request = new UpdateDatafeedStatusAction.Request(datafeedId, status); - client.execute(UpdateDatafeedStatusAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(UpdateDatafeedStatusAction.Response response) { - if (response.isAcknowledged()) { - logger.debug("successfully set datafeed [{}] status to [{}]", datafeedId, status); - } else { - logger.info("set datafeed [{}] status to [{}], but was not acknowledged", datafeedId, status); - } - handler.accept(null); - } - - @Override - public void onFailure(Exception e) { - logger.error("could not set datafeed [" + datafeedId + "] status to [" + status + "]", e); - handler.accept(e); - } - }); - } - public class Holder { - private final Datafeed datafeed; + private final DatafeedConfig datafeed; private final DatafeedJob datafeedJob; private final ProblemTracker problemTracker; private final Consumer handler; volatile Future future; - private Holder(Datafeed datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer handler) { + private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer handler) { this.datafeed = datafeed; this.datafeedJob = datafeedJob; this.problemTracker = problemTracker; @@ -285,7 +223,7 @@ public class DatafeedJobRunner extends AbstractComponent { logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); if (datafeedJob.stop()) { FutureUtils.cancel(future); - setJobDatafeedStatus(datafeed.getId(), DatafeedStatus.STOPPED, error -> handler.accept(e)); + handler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); } else { logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1db2a1e6abb..43eee1ed6f7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -22,21 +22,19 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction; -import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; +import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.IgnoreDowntime; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.metadata.Allocation; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; -import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.util.Collections; @@ -295,27 +293,6 @@ public class JobManager extends AbstractComponent { return buildNewClusterState(currentState, builder); } - public void updateDatafeedStatus(UpdateDatafeedStatusAction.Request request, - ActionListener actionListener) { - String datafeedId = request.getDatafeedId(); - DatafeedStatus newStatus = request.getDatafeedStatus(); - clusterService.submitStateUpdateTask("update-datafeed-status-" + datafeedId, - new AckedClusterStateUpdateTask(request, actionListener) { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata.Builder builder = createMlMetadataBuilder(currentState); - builder.updateDatafeedStatus(datafeedId, newStatus); - return buildNewClusterState(currentState, builder); - } - - @Override - protected UpdateDatafeedStatusAction.Response newResponse(boolean acknowledged) { - return new UpdateDatafeedStatusAction.Response(acknowledged); - } - }); - } - private Allocation getAllocation(ClusterState state, String jobId) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); Allocation allocation = mlMetadata.getAllocations().get(jobId); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java index d079829a5bf..3e12da1878b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java @@ -15,22 +15,24 @@ import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; +import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import java.io.IOException; import java.util.Collection; @@ -42,6 +44,7 @@ import java.util.Objects; import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Predicate; public class MlMetadata implements MetaData.Custom { @@ -59,15 +62,15 @@ public class MlMetadata implements MetaData.Custom { static { ML_METADATA_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.PARSER.apply(p, c).build(), JOBS_FIELD); ML_METADATA_PARSER.declareObjectArray(Builder::putAllocations, Allocation.PARSER, ALLOCATIONS_FIELD); - ML_METADATA_PARSER.declareObjectArray(Builder::putDatafeeds, Datafeed.PARSER, DATAFEEDS_FIELD); + ML_METADATA_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.PARSER.apply(p, c).build(), DATAFEEDS_FIELD); } private final SortedMap jobs; private final SortedMap allocations; - private final SortedMap datafeeds; + private final SortedMap datafeeds; private MlMetadata(SortedMap jobs, SortedMap allocations, - SortedMap datafeeds) { + SortedMap datafeeds) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.allocations = Collections.unmodifiableSortedMap(allocations); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); @@ -81,11 +84,11 @@ public class MlMetadata implements MetaData.Custom { return allocations; } - public SortedMap getDatafeeds() { + public SortedMap getDatafeeds() { return datafeeds; } - public Datafeed getDatafeed(String datafeedId) { + public DatafeedConfig getDatafeed(String datafeedId) { return datafeeds.get(datafeedId); } @@ -120,9 +123,9 @@ public class MlMetadata implements MetaData.Custom { } this.allocations = allocations; size = in.readVInt(); - TreeMap datafeeds = new TreeMap<>(); + TreeMap datafeeds = new TreeMap<>(); for (int i = 0; i < size; i++) { - datafeeds.put(in.readString(), new Datafeed(in)); + datafeeds.put(in.readString(), new DatafeedConfig(in)); } this.datafeeds = datafeeds; } @@ -163,7 +166,7 @@ public class MlMetadata implements MetaData.Custom { final Diff> jobs; final Diff> allocations; - final Diff> datafeeds; + final Diff> datafeeds; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); @@ -176,7 +179,7 @@ public class MlMetadata implements MetaData.Custom { MlMetadataDiff::readJobDiffFrom); this.allocations = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Allocation::new, MlMetadataDiff::readAllocationDiffFrom); - this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Datafeed::new, + this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readSchedulerDiffFrom); } @@ -184,7 +187,7 @@ public class MlMetadata implements MetaData.Custom { public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newAllocations = new TreeMap<>(allocations.apply(((MlMetadata) part).allocations)); - TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); + TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); return new MlMetadata(newJobs, newAllocations, newDatafeeds); } @@ -208,8 +211,8 @@ public class MlMetadata implements MetaData.Custom { return AbstractDiffable.readDiffFrom(Allocation::new, in); } - static Diff readSchedulerDiffFrom(StreamInput in) throws IOException { - return AbstractDiffable.readDiffFrom(Datafeed::new, in); + static Diff readSchedulerDiffFrom(StreamInput in) throws IOException { + return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in); } } @@ -227,17 +230,7 @@ public class MlMetadata implements MetaData.Custom { @Override public final String toString() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.prettyPrint(); - builder.startObject(); - toXContent(builder, EMPTY_PARAMS); - builder.endObject(); - return builder.string(); - } catch (Exception e) { - // So we have a stack trace logged somewhere - return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}"; - } + return Strings.toString(this); } @Override @@ -249,7 +242,7 @@ public class MlMetadata implements MetaData.Custom { private TreeMap jobs; private TreeMap allocations; - private TreeMap datafeeds; + private TreeMap datafeeds; public Builder() { this.jobs = new TreeMap<>(); @@ -285,7 +278,7 @@ public class MlMetadata implements MetaData.Custom { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } - Optional datafeed = getDatafeedByJobId(jobId); + Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed [" + datafeed.get().getId() + "] refers to it"); @@ -313,35 +306,38 @@ public class MlMetadata implements MetaData.Custom { if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } - Optional existingDatafeed = getDatafeedByJobId(jobId); + Optional existingDatafeed = getDatafeedByJobId(jobId); if (existingDatafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("A datafeed [" + existingDatafeed.get().getId() + "] already exists for job [" + jobId + "]"); } DatafeedJobValidator.validate(datafeedConfig, job); - return putDatafeed(new Datafeed(datafeedConfig, DatafeedStatus.STOPPED)); - } - - private Builder putDatafeed(Datafeed datafeed) { - datafeeds.put(datafeed.getId(), datafeed); + datafeeds.put(datafeedConfig.getId(), datafeedConfig); return this; } - public Builder removeDatafeed(String datafeedId) { - Datafeed datafeed = datafeeds.get(datafeedId); + public Builder removeDatafeed(String datafeedId, PersistentTasksInProgress persistentTasksInProgress) { + DatafeedConfig datafeed = datafeeds.get(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); } - if (datafeed.getStatus() != DatafeedStatus.STOPPED) { - String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, datafeed.getStatus()); - throw ExceptionsHelper.conflictStatusException(msg); + if (persistentTasksInProgress != null) { + Predicate> predicate = t -> { + StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); + return storedRequest.getDatafeedId().equals(datafeedId); + }; + if (persistentTasksInProgress.entriesExist(StartDatafeedAction.NAME, predicate)) { + String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, + DatafeedStatus.STARTED); + throw ExceptionsHelper.conflictStatusException(msg); + } } datafeeds.remove(datafeedId); return this; } - private Optional getDatafeedByJobId(String jobId) { + private Optional getDatafeedByJobId(String jobId) { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } @@ -361,9 +357,9 @@ public class MlMetadata implements MetaData.Custom { return this; } - private Builder putDatafeeds(Collection datafeeds) { - for (Datafeed datafeed : datafeeds) { - putDatafeed(datafeed); + private Builder putDatafeeds(Collection datafeeds) { + for (DatafeedConfig datafeed : datafeeds) { + this.datafeeds.put(datafeed.getId(), datafeed); } return this; } @@ -395,7 +391,7 @@ public class MlMetadata implements MetaData.Custom { // Cannot update the status to DELETING if there are datafeeds attached if (jobStatus.equals(JobStatus.DELETING)) { - Optional datafeed = getDatafeedByJobId(jobId); + Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed [" + datafeed.get().getId() + "] refers to it"); @@ -440,33 +436,6 @@ public class MlMetadata implements MetaData.Custom { allocations.put(jobId, builder.build()); return this; } - - public Builder updateDatafeedStatus(String datafeedId, DatafeedStatus newStatus) { - Datafeed datafeed = datafeeds.get(datafeedId); - if (datafeed == null) { - throw ExceptionsHelper.missingDatafeedException(datafeedId); - } - - DatafeedStatus currentStatus = datafeed.getStatus(); - switch (newStatus) { - case STARTED: - if (currentStatus != DatafeedStatus.STOPPED) { - String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_START, datafeedId, newStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - case STOPPED: - if (currentStatus != DatafeedStatus.STARTED) { - String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_STOP_IN_CURRENT_STATE, datafeedId, newStatus); - throw ExceptionsHelper.conflictStatusException(msg); - } - break; - default: - throw new IllegalArgumentException("[" + datafeedId + "] requested invalid datafeed status [" + newStatus + "]"); - } - datafeeds.put(datafeedId, new Datafeed(datafeed.getConfig(), newStatus)); - return this; - } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java index 4a07c514eb0..6db1572f955 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java @@ -8,17 +8,21 @@ package org.elasticsearch.xpack.ml.rest.datafeeds; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.persistent.PersistentActionResponse; import java.io.IOException; @@ -49,12 +53,19 @@ public class RestStartDatafeedAction extends BaseRestHandler { } jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis); jobDatafeedRequest.setEndTime(endTimeMillis); - TimeValue startTimeout = restRequest.paramAsTime(StartDatafeedAction.START_TIMEOUT.getPreferredName(), - TimeValue.timeValueSeconds(30)); - jobDatafeedRequest.setStartTimeout(startTimeout); } return channel -> { - client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, new RestToXContentListener<>(channel)); + client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, + new RestBuilderListener(channel) { + + @Override + public RestResponse buildResponse(PersistentActionResponse r, XContentBuilder builder) throws Exception { + builder.startObject(); + builder.field("started", true); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + }); }; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index 456472a0d4e..6e2fb892af6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.rest.datafeeds; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -30,9 +29,6 @@ public class RestStopDatafeedAction extends BaseRestHandler { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { StopDatafeedAction.Request jobDatafeedRequest = new StopDatafeedAction.Request( restRequest.param(DatafeedConfig.ID.getPreferredName())); - if (restRequest.hasParam("stop_timeout")) { - jobDatafeedRequest.setStopTimeout(TimeValue.parseTimeValue(restRequest.param("stop_timeout"), "stop_timeout")); - } return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new AcknowledgedRestListener<>(channel)); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStatusObserver.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStatusObserver.java deleted file mode 100644 index 8904030665d..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStatusObserver.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.utils; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; - -import java.util.function.Consumer; -import java.util.function.Predicate; - -public class DatafeedStatusObserver { - - private static final Logger LOGGER = Loggers.getLogger(DatafeedStatusObserver.class); - - private final ThreadPool threadPool; - private final ClusterService clusterService; - - public DatafeedStatusObserver(ThreadPool threadPool, ClusterService clusterService) { - this.threadPool = threadPool; - this.clusterService = clusterService; - } - - public void waitForStatus(String datafeedId, TimeValue waitTimeout, DatafeedStatus expectedStatus, Consumer handler) { - ClusterStateObserver observer = - new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext()); - DatafeedPredicate datafeedPredicate = new DatafeedPredicate(datafeedId, expectedStatus); - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - handler.accept(null); - } - - @Override - public void onClusterServiceClose() { - Exception e = new IllegalArgumentException("Cluster service closed while waiting for datafeed status to change to [" - + expectedStatus + "]"); - handler.accept(new IllegalStateException(e)); - } - - @Override - public void onTimeout(TimeValue timeout) { - if (datafeedPredicate.test(clusterService.state())) { - handler.accept(null); - } else { - Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed status to change to [" - + expectedStatus + "]"); - handler.accept(e); - } - } - }, datafeedPredicate, waitTimeout); - } - - private static class DatafeedPredicate implements Predicate { - - private final String datafeedId; - private final DatafeedStatus expectedStatus; - - DatafeedPredicate(String datafeedId, DatafeedStatus expectedStatus) { - this.datafeedId = datafeedId; - this.expectedStatus = expectedStatus; - } - - @Override - public boolean test(ClusterState newState) { - MlMetadata metadata = newState.getMetaData().custom(MlMetadata.TYPE); - if (metadata != null) { - Datafeed datafeed = metadata.getDatafeed(datafeedId); - if (datafeed != null) { - return datafeed.getStatus() == expectedStatus; - } - } - return false; - } - - } - -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java index 1e0d5bd9b72..49b02be881d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java @@ -17,8 +17,11 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * A cluster state record that contains a list of all running persistent tasks @@ -40,6 +43,19 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable> findEntries(String actionName, Predicate> predicate) { + return this.entries().stream() + .filter(p -> actionName.equals(p.getAction())) + .filter(predicate) + .collect(Collectors.toList()); + } + + public boolean entriesExist(String actionName, Predicate> predicate) { + return this.entries().stream() + .filter(p -> actionName.equals(p.getAction())) + .anyMatch(predicate); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java index db9ef511af4..77af43c6c99 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java @@ -99,7 +99,7 @@ public class RemovePersistentTaskAction extends Action { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); - MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get() - .getState().metaData().custom(MlMetadata.TYPE); - assertThat(mlMetadata.getDatafeed(datafeedConfig.getId()).get().getStatus(), equalTo(DatafeedStatus.STOPPED)); + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED)); }); } @@ -139,9 +139,8 @@ public class DatafeedJobsIT extends ESIntegTestCase { assertTrue(putDatafeedResponse.isAcknowledged()); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L); - StartDatafeedAction.Response startDatafeedResponse = + PersistentActionResponse startDatafeedResponse = client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); - assertTrue(startDatafeedResponse.isStarted()); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); @@ -159,7 +158,7 @@ public class DatafeedJobsIT extends ESIntegTestCase { StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId()); try { - StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); + RemovePersistentTaskAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); assertTrue(stopJobResponse.isAcknowledged()); } catch (Exception e) { NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get(); @@ -170,9 +169,9 @@ public class DatafeedJobsIT extends ESIntegTestCase { throw e; } assertBusy(() -> { - MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get() - .getState().metaData().custom(MlMetadata.TYPE); - assertThat(mlMetadata.getDatafeed(datafeedConfig.getId()).get().getStatus(), equalTo(DatafeedStatus.STOPPED)); + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED)); }); } @@ -240,10 +239,10 @@ public class DatafeedJobsIT extends ESIntegTestCase { private static void deleteAllDatafeeds(Client client) throws Exception { MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); - for (Datafeed datafeed : mlMetadata.getDatafeeds().values()) { + for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) { String datafeedId = datafeed.getId(); try { - StopDatafeedAction.Response stopResponse = + RemovePersistentTaskAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); assertTrue(stopResponse.isAcknowledged()); } catch (ExecutionException e) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedStatsActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedStatsActionResponseTests.java index bfda5a520a4..646141c173d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedStatsActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedStatsActionResponseTests.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response; import org.elasticsearch.xpack.ml.action.util.QueryPage; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; @@ -30,7 +30,7 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC datafeedStatsList.add(datafeedStats); } - result = new Response(new QueryPage<>(datafeedStatsList, datafeedStatsList.size(), Datafeed.RESULTS_FIELD)); + result = new Response(new QueryPage<>(datafeedStatsList, datafeedStatsList.size(), DatafeedConfig.RESULTS_FIELD)); return result; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java index 48ea779ee24..4538801486d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.ml.action.GetDatafeedsAction.Response; import org.elasticsearch.xpack.ml.action.util.QueryPage; -import org.elasticsearch.xpack.ml.datafeed.Datafeed; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; @@ -61,7 +60,7 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase< datafeedList.add(datafeedConfig.build()); } - result = new Response(new QueryPage<>(datafeedList, datafeedList.size(), Datafeed.RESULTS_FIELD)); + result = new Response(new QueryPage<>(datafeedList, datafeedList.size(), DatafeedConfig.RESULTS_FIELD)); return result; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java index 7591112f82d..d27b01f2522 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java @@ -5,11 +5,18 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; +import static org.hamcrest.Matchers.equalTo; + public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase { @Override @@ -18,9 +25,6 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT if (randomBoolean()) { request.setEndTime(randomNonNegativeLong()); } - if (randomBoolean()) { - request.setStartTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); - } return request; } @@ -34,4 +38,22 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT return Request.parseRequest(null, parser); } + public void testValidate() { + Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .build(); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1)); + assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); + + DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); + MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) + .putDatafeed(datafeedConfig1) + .build(); + e = expectThrows(ElasticsearchStatusException.class, + () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2)); + assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job status [OPENED], but got [CLOSED]")); + } + } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 6d32879fed3..a9ee0609163 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -5,12 +5,9 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; @@ -24,9 +21,6 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase StopDatafeedAction.validate("foo", mlMetadata1)); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> StopDatafeedAction.validate("foo", mlMetadata1)); assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists")); DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "foo").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) .putDatafeed(datafeedConfig) .build(); - e = expectThrows(ElasticsearchStatusException.class, () -> StopDatafeedAction.validate("foo", mlMetadata2)); - assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed status [STARTED], but got [STOPPED]")); - - MlMetadata mlMetadata3 = new MlMetadata.Builder().putJob(job, false) - .putDatafeed(datafeedConfig) - .updateDatafeedStatus("foo", DatafeedStatus.STARTED) - .build(); - StopDatafeedAction.validate("foo", mlMetadata3); + StopDatafeedAction.validate("foo", mlMetadata2); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusRequestTests.java deleted file mode 100644 index 680582c2ce6..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedStatusRequestTests.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.Request; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; -import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; - -public class UpdateDatafeedStatusRequestTests extends AbstractStreamableTestCase { - - @Override - protected Request createTestInstance() { - return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(DatafeedStatus.values())); - } - - @Override - protected Request createBlankInstance() { - return new Request(); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index c5658f0fb4d..b09b5d0d4c4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -5,10 +5,8 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -20,20 +18,19 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.FlushJobAction; -import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; import org.elasticsearch.xpack.ml.action.PostDataAction; -import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; -import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import java.io.ByteArrayInputStream; @@ -45,9 +42,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -import static org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.INSTANCE; -import static org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.Request; -import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.same; @@ -77,12 +71,6 @@ public class DatafeedJobRunnerTests extends ESTestCase { jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); clusterService = mock(ClusterService.class); - doAnswer(invocation -> { - @SuppressWarnings("rawtypes") - ActionListener actionListener = (ActionListener) invocation.getArguments()[2]; - actionListener.onResponse(new UpdateDatafeedStatusAction.Response()); - return null; - }).when(client).execute(same(UpdateDatafeedStatusAction.INSTANCE), any(), any()); JobProvider jobProvider = mock(JobProvider.class); Mockito.doAnswer(invocationOnMock -> { @@ -141,15 +129,13 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenReturn(Optional.of(in)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - InternalStartDatafeedAction.DatafeedTask task = mock(InternalStartDatafeedAction.DatafeedTask.class); + StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STARTED)), any()); - verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any()); } private static PostDataAction.Request createExpectedPostDataRequest(Job job) { @@ -181,15 +167,13 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - InternalStartDatafeedAction.DatafeedTask task = mock(InternalStartDatafeedAction.DatafeedTask.class); + StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); - verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STARTED)), any()); - verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any()); } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { @@ -214,14 +198,13 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); - InternalStartDatafeedAction.DatafeedTask task = - new InternalStartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1"); + StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1"); datafeedJobRunner.run("datafeed1", 0L, null, task, handler); verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME); if (cancelled) { task.stop(); - verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any()); + verify(handler).accept(null); } else { verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); @@ -246,32 +229,6 @@ public class DatafeedJobRunnerTests extends ESTestCase { return builder; } - public void testValidate() { - Job job1 = createDatafeedJob().build(); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); - Exception e = expectThrows(ResourceNotFoundException.class, - () -> DatafeedJobRunner.validate("some-datafeed", mlMetadata1)); - assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); - - DatafeedConfig datafeedConfig1 = createDatafeedConfig("foo-datafeed", "foo").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1) - .build(); - e = expectThrows(ElasticsearchStatusException.class, - () -> DatafeedJobRunner.validate("foo-datafeed", mlMetadata2)); - assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job status [OPENED], but got [CLOSED]")); - - MlMetadata mlMetadata3 = new MlMetadata.Builder(mlMetadata2) - .updateStatus("foo", JobStatus.OPENED, null) - .updateDatafeedStatus("foo-datafeed", DatafeedStatus.STARTED) - .build(); - e = expectThrows(ElasticsearchStatusException.class, - () -> DatafeedJobRunner.validate("foo-datafeed", mlMetadata3)); - assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed status [STOPPED], but got [STARTED]")); - } - @SuppressWarnings("unchecked") private Consumer mockConsumer() { return mock(Consumer.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTests.java deleted file mode 100644 index d7bf077e7b8..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTests.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; - -public class DatafeedTests extends AbstractSerializingTestCase { - - @Override - protected Datafeed createTestInstance() { - return new Datafeed(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAsciiOfLength(10)), - randomFrom(DatafeedStatus.values())); - } - - @Override - protected Writeable.Reader instanceReader() { - return Datafeed::new; - } - - @Override - protected Datafeed parseInstance(XContentParser parser) { - return Datafeed.PARSER.apply(parser, null); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index aac0394518b..797bfddf7be 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -40,8 +40,7 @@ public class MlRestTestStateCleaner { } for (Map datafeed : datafeeds) { - Map datafeedMap = (Map) datafeed.get("config"); - String datafeedId = (String) datafeedMap.get("datafeed_id"); + String datafeedId = (String) datafeed.get("datafeed_id"); try { client.performRequest("POST", "/_xpack/ml/datafeeds/" + datafeedId + "/_stop"); } catch (Exception e) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index d9d1a261cf0..12d684b275a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -12,19 +12,23 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.ml.MlPlugin; +import org.elasticsearch.xpack.ml.action.DatafeedJobsIT; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; -import org.elasticsearch.xpack.ml.action.DatafeedJobsIT; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.persistent.PersistentActionRequest; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.junit.After; import java.io.IOException; @@ -127,7 +131,13 @@ public class TooManyJobsIT extends ESIntegTestCase { public static void ensureClusterStateConsistencyWorkAround() throws IOException { if (cluster() != null && cluster().size() > 0) { List namedWritables = new ArrayList<>(ClusterModule.getNamedWriteables()); + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + namedWritables.addAll(searchModule.getNamedWriteables()); namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); + namedWritables.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, + PersistentTasksInProgress::new)); + namedWritables.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, + StartDatafeedAction.Request::new)); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWritables); ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 9128cb2e03a..d1b27e31dcf 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -15,20 +15,22 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobStatus; import org.elasticsearch.xpack.ml.job.config.JobTests; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; -import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; +import java.util.Collections; -import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; +import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; +import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -53,9 +55,6 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } builder.putJob(job, false); builder.putDatafeed(datafeedConfig); - if (randomBoolean()) { - builder.updateDatafeedStatus(datafeedConfig.getId(), DatafeedStatus.STARTED); - } } else { builder.putJob(job, false); } @@ -74,7 +73,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { @Override protected Writeable.Reader instanceReader() { - return in -> new MlMetadata(in); + return MlMetadata::new; } @Override @@ -194,11 +193,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { MlMetadata result = builder.build(); assertThat(result.getJobs().get("foo"), sameInstance(job1)); assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED)); - assertThat(result.getDatafeeds().get("datafeed1").getConfig(), sameInstance(datafeedConfig1)); - assertThat(result.getDatafeeds().get("datafeed1").getStatus(), equalTo(DatafeedStatus.STOPPED)); + assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); builder = new MlMetadata.Builder(result); - builder.removeDatafeed("datafeed1"); + builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyList())); result = builder.build(); assertThat(result.getJobs().get("foo"), sameInstance(job1)); assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED)); @@ -255,16 +253,20 @@ public class MlMetadataTests extends AbstractSerializingTestCase { builder.putDatafeed(datafeedConfig1); builder.updateStatus("foo", JobStatus.OPENING, null); builder.updateStatus("foo", JobStatus.OPENED, null); - builder.updateDatafeedStatus("datafeed1", DatafeedStatus.STARTED); MlMetadata result = builder.build(); assertThat(result.getJobs().get("foo"), sameInstance(job1)); assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.OPENED)); - assertThat(result.getDatafeeds().get("datafeed1").getConfig(), sameInstance(datafeedConfig1)); - assertThat(result.getDatafeeds().get("datafeed1").getStatus(), equalTo(DatafeedStatus.STARTED)); + assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); + + StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); + PersistentTasksInProgress.PersistentTaskInProgress taskInProgress = + new PersistentTasksInProgress.PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null); + PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonList(taskInProgress)); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.removeDatafeed("datafeed1")); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> builder2.removeDatafeed("datafeed1", tasksInProgress)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json index 0fc822e477f..3d1d00293c2 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json @@ -21,10 +21,6 @@ "type": "string", "required": false, "description": "The end time when the datafeed should stop. When not set, the datafeed continues in real time" - }, - "start_timeout": { - "type": "time", - "description": "Controls the time to wait until a datafeed has started. Default to 30 seconds" } } }, diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json index 3e3b2a51566..fe8fa172f55 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json @@ -13,10 +13,6 @@ "type": "string", "required": true, "description": "The ID of the datafeed to stop" - }, - "stop_timeout": { - "type": "time", - "description": "Controls the time to wait until a datafeed has stopped. Default to 30 seconds" } }, "body": null