Moved start and stop datafeeder apis over the persistent task infrastructure

Original commit: elastic/x-pack-elasticsearch@8e15578fb7
This commit is contained in:
Martijn van Groningen 2017-01-30 16:23:05 +01:00
parent 22282e9d56
commit 051d8d8fdf
31 changed files with 428 additions and 1119 deletions

View File

@ -25,16 +25,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
@ -61,7 +51,6 @@ import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.InternalOpenJobAction;
import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
@ -70,17 +59,16 @@ import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.metadata.MlInitializationService;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeController;
@ -120,7 +108,17 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetCategoriesAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import java.io.IOException;
import java.nio.file.Path;
@ -180,7 +178,8 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom)
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new)
);
}
@ -299,7 +298,6 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(OpenJobAction.INSTANCE, OpenJobAction.TransportAction.class),
new ActionHandler<>(InternalOpenJobAction.INSTANCE, InternalOpenJobAction.TransportAction.class),
new ActionHandler<>(UpdateJobStatusAction.INSTANCE, UpdateJobStatusAction.TransportAction.class),
new ActionHandler<>(UpdateDatafeedStatusAction.INSTANCE, UpdateDatafeedStatusAction.TransportAction.class),
new ActionHandler<>(GetFiltersAction.INSTANCE, GetFiltersAction.TransportAction.class),
new ActionHandler<>(PutFilterAction.INSTANCE, PutFilterAction.TransportAction.class),
new ActionHandler<>(DeleteFilterAction.INSTANCE, DeleteFilterAction.TransportAction.class),
@ -320,7 +318,6 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class),
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
new ActionHandler<>(InternalStartDatafeedAction.INSTANCE, InternalStartDatafeedAction.TransportAction.class),
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),

View File

@ -32,6 +32,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.Objects;
@ -169,8 +170,9 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress persistentTasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.removeDatafeed(request.getDatafeedId()).build();
.removeDatafeed(request.getDatafeedId(), persistentTasksInProgress).build();
return ClusterState.builder(state).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
.build();

View File

@ -29,16 +29,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDatafeedsAction.Response,
GetDatafeedsAction.RequestBuilder> {
@ -198,18 +197,17 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get datafeed '{}'", request.getDatafeedId());
QueryPage<DatafeedConfig> response = null;
QueryPage<DatafeedConfig> response;
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (ALL.equals(request.getDatafeedId())) {
List<DatafeedConfig> datafeedConfigs = mlMetadata.getDatafeeds().values().stream().map(
s -> s.getConfig()).collect(Collectors.toList());
response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), Datafeed.RESULTS_FIELD);
List<DatafeedConfig> datafeedConfigs = new ArrayList<>(mlMetadata.getDatafeeds().values());
response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), DatafeedConfig.RESULTS_FIELD);
} else {
Datafeed datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
response = new QueryPage<>(Collections.singletonList(datafeed.getConfig()), 1, Datafeed.RESULTS_FIELD);
response = new QueryPage<>(Collections.singletonList(datafeed), 1, DatafeedConfig.RESULTS_FIELD);
}
listener.onResponse(new Response(response));

View File

@ -31,17 +31,21 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Request, GetDatafeedsStatsAction.Response,
GetDatafeedsStatsAction.RequestBuilder> {
@ -240,7 +244,8 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
@ -259,22 +264,34 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
Map<String, DatafeedStatus> statuses = new HashMap<>();
PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = ALL.equals(request.getDatafeedId()) ? p -> true :
p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId());
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, predicate)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
statuses.put(storedRequest.getDatafeedId(), DatafeedStatus.STARTED);
}
}
List<Response.DatafeedStats> stats = new ArrayList<>();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (ALL.equals(request.getDatafeedId())) {
Collection<Datafeed> datafeeds = mlMetadata.getDatafeeds().values();
for (Datafeed datafeed : datafeeds) {
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeed.getStatus()));
Collection<DatafeedConfig> datafeeds = mlMetadata.getDatafeeds().values();
for (DatafeedConfig datafeed : datafeeds) {
DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), status));
}
} else {
Datafeed datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
stats.add(new Response.DatafeedStats(datafeed.getId(), datafeed.getStatus()));
DatafeedStatus status = statuses.getOrDefault(datafeed.getId(), DatafeedStatus.STOPPED);
stats.add(new Response.DatafeedStats(datafeed.getId(), status));
}
QueryPage<Response.DatafeedStats> statsPage = new QueryPage<>(stats, stats.size(), Datafeed.RESULTS_FIELD);
QueryPage<Response.DatafeedStats> statsPage = new QueryPage<>(stats, stats.size(), DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new Response(statsPage));
}

View File

@ -1,135 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
public class InternalStartDatafeedAction extends
Action<InternalStartDatafeedAction.Request, InternalStartDatafeedAction.Response, InternalStartDatafeedAction.RequestBuilder> {
public static final InternalStartDatafeedAction INSTANCE = new InternalStartDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/internal_start";
private InternalStartDatafeedAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends StartDatafeedAction.Request {
Request(String datafeedId, long startTime) {
super(datafeedId, startTime);
}
Request() {
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new DatafeedTask(id, type, action, parentTaskId, getDatafeedId());
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, InternalStartDatafeedAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse {
Response() {
}
}
public static class DatafeedTask extends CancellableTask {
private volatile DatafeedJobRunner.Holder holder;
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) {
super(id, type, action, "datafeed-" + datafeedId, parentTaskId);
}
public void setHolder(DatafeedJobRunner.Holder holder) {
this.holder = holder;
}
@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
@Override
protected void onCancelled() {
stop();
}
/* public for testing */
public void stop() {
if (holder == null) {
throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
}
holder.stop("cancel", null);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final DatafeedJobRunner datafeedJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedJobRunner datafeedJobRunner) {
super(settings, InternalStartDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);
this.datafeedJobRunner = datafeedJobRunner;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
DatafeedTask datafeedTask = (DatafeedTask) task;
datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), datafeedTask, (error) -> {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(new Response());
}
});
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -5,43 +5,53 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.DatafeedStatusObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Predicate;
public class StartDatafeedAction
extends Action<StartDatafeedAction.Request, StartDatafeedAction.Response, StartDatafeedAction.RequestBuilder> {
extends Action<StartDatafeedAction.Request, PersistentActionResponse, StartDatafeedAction.RequestBuilder> {
public static final ParseField START_TIME = new ParseField("start");
public static final ParseField END_TIME = new ParseField("end");
@ -60,11 +70,11 @@ public class StartDatafeedAction
}
@Override
public Response newResponse() {
return new Response();
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
public static class Request extends ActionRequest implements ToXContent {
public static class Request extends PersistentActionRequest implements ToXContent {
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
@ -72,8 +82,6 @@ public class StartDatafeedAction
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
PARSER.declareLong(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) -> request.setStartTimeout(TimeValue.parseTimeValue(val,
START_TIME.getPreferredName())), START_TIMEOUT);
}
public static Request parseRequest(String datafeedId, XContentParser parser) {
@ -87,13 +95,16 @@ public class StartDatafeedAction
private String datafeedId;
private long startTime;
private Long endTime;
private TimeValue startTimeout = TimeValue.timeValueSeconds(20);
public Request(String datafeedId, long startTime) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.startTime = startTime;
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
Request() {
}
@ -113,26 +124,22 @@ public class StartDatafeedAction
this.endTime = endTime;
}
public TimeValue getStartTimeout() {
return startTimeout;
}
public void setStartTimeout(TimeValue startTimeout) {
this.startTimeout = startTimeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new DatafeedTask(id, type, action, parentTaskId, datafeedId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
startTime = in.readVLong();
endTime = in.readOptionalLong();
startTimeout = new TimeValue(in.readVLong());
}
@Override
@ -141,7 +148,11 @@ public class StartDatafeedAction
out.writeString(datafeedId);
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(startTimeout.millis());
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
@ -152,16 +163,13 @@ public class StartDatafeedAction
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
}
if (startTimeout != null) {
builder.field(START_TIMEOUT.getPreferredName(), startTimeout.getStringRep());
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, startTime, endTime, startTimeout);
return Objects.hash(datafeedId, startTime, endTime);
}
@Override
@ -175,90 +183,111 @@ public class StartDatafeedAction
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime) &&
Objects.equals(startTimeout, other.startTimeout);
Objects.equals(endTime, other.endTime);
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
static class RequestBuilder extends ActionRequestBuilder<Request, PersistentActionResponse, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, StartDatafeedAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static class DatafeedTask extends PersistentTask {
private boolean started;
private volatile DatafeedJobRunner.Holder holder;
Response(boolean started) {
this.started = started;
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) {
super(id, type, action, "datafeed-" + datafeedId, parentTaskId);
}
Response() {
}
public boolean isStarted() {
return started;
public void setHolder(DatafeedJobRunner.Holder holder) {
this.holder = holder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
started = in.readBoolean();
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(started);
protected void onCancelled() {
stop();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("started", started);
builder.endObject();
return builder;
/* public for testing */
public void stop() {
if (holder == null) {
throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
}
holder.stop("cancel", null);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
public static class TransportAction extends TransportPersistentAction<Request> {
private final ClusterService clusterService;
private final DatafeedStatusObserver datafeedStatusObserver;
private final InternalStartDatafeedAction.TransportAction transportAction;
private final DatafeedJobRunner datafeedJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, InternalStartDatafeedAction.TransportAction transportAction) {
super(settings, StartDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);
this.clusterService = clusterService;
this.datafeedStatusObserver = new DatafeedStatusObserver(threadPool, clusterService);
this.transportAction = transportAction;
DatafeedJobRunner datafeedJobRunner) {
super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry,
actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.datafeedJobRunner = datafeedJobRunner;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
// This validation happens also in DatafeedJobRunner, the reason we do it here too is that if it fails there
// we are unable to provide the user immediate feedback. We would create the task and the validation would fail
// in the background, whereas now the validation failure is part of the response being returned.
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
DatafeedJobRunner.validate(request.datafeedId, mlMetadata);
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata);
PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE);
if (persistentTasksInProgress == null) {
return;
}
InternalStartDatafeedAction.Request internalRequest =
new InternalStartDatafeedAction.Request(request.datafeedId, request.startTime);
internalRequest.setEndTime(request.endTime);
transportAction.execute(internalRequest, LoggingTaskListener.instance());
datafeedStatusObserver.waitForStatus(request.datafeedId, request.startTimeout, DatafeedStatus.STARTED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
listener.onResponse(new Response(true));
}
});
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> predicate = taskInProgress -> {
Request storedRequest = (Request) taskInProgress.getRequest();
return storedRequest.getDatafeedId().equals(request.getDatafeedId());
};
if (persistentTasksInProgress.entriesExist(NAME, predicate)) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STOPPED, DatafeedStatus.STARTED);
}
}
@Override
protected void nodeOperation(PersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) task;
datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(),
datafeedTask,
(error) -> {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
});
}
}
public static void validate(String datafeedId, MlMetadata mlMetadata) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
Allocation allocation = mlMetadata.getAllocations().get(datafeed.getJobId());
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
}
DatafeedJobValidator.validate(datafeed, job);
}
}

View File

@ -9,43 +9,38 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.DatafeedStatusObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import java.io.IOException;
import java.util.Objects;
public class StopDatafeedAction
extends Action<StopDatafeedAction.Request, StopDatafeedAction.Response, StopDatafeedAction.RequestBuilder> {
extends Action<StopDatafeedAction.Request, RemovePersistentTaskAction.Response, StopDatafeedAction.RequestBuilder> {
public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/stop";
@ -60,14 +55,13 @@ public class StopDatafeedAction
}
@Override
public Response newResponse() {
return new Response();
public RemovePersistentTaskAction.Response newResponse() {
return new RemovePersistentTaskAction.Response();
}
public static class Request extends ActionRequest {
public static class Request extends MasterNodeRequest<Request> {
private String datafeedId;
private TimeValue stopTimeout = TimeValue.timeValueSeconds(20);
public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());
@ -80,10 +74,6 @@ public class StopDatafeedAction
return datafeedId;
}
public void setStopTimeout(TimeValue stopTimeout) {
this.stopTimeout = stopTimeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -93,19 +83,17 @@ public class StopDatafeedAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
stopTimeout = TimeValue.timeValueMillis(in.readVLong());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
out.writeVLong(stopTimeout.millis());
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, stopTimeout);
return Objects.hash(datafeedId);
}
@Override
@ -117,114 +105,74 @@ public class StopDatafeedAction
return false;
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(stopTimeout, other.stopTimeout);
return Objects.equals(datafeedId, other.datafeedId);
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
static class RequestBuilder extends ActionRequestBuilder<Request, RemovePersistentTaskAction.Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public static class TransportAction extends TransportMasterNodeAction<Request, RemovePersistentTaskAction.Response> {
private Response() {
super(true);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService;
private final TransportListTasksAction listTasksAction;
private final TransportCancelTasksAction cancelTasksAction;
private final DatafeedStatusObserver datafeedStatusObserver;
private final RemovePersistentTaskAction.TransportAction removePersistentTaskAction;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportCancelTasksAction cancelTasksAction,
TransportListTasksAction listTasksAction) {
super(settings, StopDatafeedAction.NAME, threadPool, transportService, actionFilters,
ClusterService clusterService, RemovePersistentTaskAction.TransportAction removePersistentTaskAction) {
super(settings, StopDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.listTasksAction = listTasksAction;
this.cancelTasksAction = cancelTasksAction;
this.datafeedStatusObserver = new DatafeedStatusObserver(threadPool, clusterService);
this.removePersistentTaskAction = removePersistentTaskAction;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
String datafeedId = request.getDatafeedId();
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
validate(datafeedId, mlMetadata);
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(InternalStartDatafeedAction.NAME);
listTasksRequest.setDetailed(true);
listTasksAction.execute(listTasksRequest, new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse listTasksResponse) {
String expectedJobDescription = "datafeed-" + datafeedId;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedJobDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
datafeedStatusObserver.waitForStatus(datafeedId, request.stopTimeout, DatafeedStatus.STOPPED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
listener.onResponse(new Response());
}
});
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
return;
}
}
listener.onFailure(new ResourceNotFoundException("No datafeed [" + datafeedId + "] running"));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected RemovePersistentTaskAction.Response newResponse() {
return new RemovePersistentTaskAction.Response();
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<RemovePersistentTaskAction.Response> listener) throws Exception {
String datafeedId = request.getDatafeedId();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
validate(datafeedId, mlMetadata);
PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, p -> true)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
if (storedRequest.getDatafeedId().equals(datafeedId)) {
RemovePersistentTaskAction.Request cancelTasksRequest = new RemovePersistentTaskAction.Request();
cancelTasksRequest.setTaskId(taskInProgress.getId());
removePersistentTaskAction.execute(cancelTasksRequest, listener);
return;
}
}
}
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STARTED, DatafeedStatus.STOPPED));
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
static void validate(String datafeedId, MlMetadata mlMetadata) {
Datafeed datafeed = mlMetadata.getDatafeed(datafeedId);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
}
if (datafeed.getStatus() == DatafeedStatus.STOPPED) {
throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STARTED, datafeed.getStatus());
}
}
}

View File

@ -1,190 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
public class UpdateDatafeedStatusAction extends Action<UpdateDatafeedStatusAction.Request,
UpdateDatafeedStatusAction.Response, UpdateDatafeedStatusAction.RequestBuilder> {
public static final UpdateDatafeedStatusAction INSTANCE = new UpdateDatafeedStatusAction();
public static final String NAME = "cluster:admin/ml/datafeeds/status/update";
private UpdateDatafeedStatusAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
private String datafeedId;
private DatafeedStatus datafeedStatus;
public Request(String datafeedId, DatafeedStatus datafeedStatus) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.datafeedStatus = ExceptionsHelper.requireNonNull(datafeedStatus, "status");
}
Request() {}
public String getDatafeedId() {
return datafeedId;
}
public void setDatafeedId(String datafeedId) {
this.datafeedId = datafeedId;
}
public DatafeedStatus getDatafeedStatus() {
return datafeedStatus;
}
public void setDatafeedStatus(DatafeedStatus datafeedStatus) {
this.datafeedStatus = datafeedStatus;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
datafeedStatus = DatafeedStatus.fromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
datafeedStatus.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, datafeedStatus);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
UpdateDatafeedStatusAction.Request other = (UpdateDatafeedStatusAction.Request) obj;
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(datafeedStatus, other.datafeedStatus);
}
@Override
public String toString() {
return "Request{" +
DatafeedConfig.ID.getPreferredName() + "='" + datafeedId + "', " +
"status=" + datafeedStatus +
'}';
}
}
static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, UpdateDatafeedStatusAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
public Response(boolean acknowledged) {
super(acknowledged);
}
public Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobManager jobManager;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager) {
super(settings, UpdateDatafeedStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.updateDatafeedStatus(request, listener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -1,101 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class Datafeed extends AbstractDiffable<Datafeed> implements ToXContent {
private static final ParseField CONFIG_FIELD = new ParseField("config");
private static final ParseField STATUS_FIELD = new ParseField("status");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
public static final ConstructingObjectParser<Datafeed, Void> PARSER = new ConstructingObjectParser<>("datafeed",
a -> new Datafeed(((DatafeedConfig.Builder) a[0]).build(), (DatafeedStatus) a[1]));
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), DatafeedConfig.PARSER, CONFIG_FIELD);
PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> DatafeedStatus.fromString(p.text()), STATUS_FIELD,
ObjectParser.ValueType.STRING);
}
private final DatafeedConfig config;
private final DatafeedStatus status;
public Datafeed(DatafeedConfig config, DatafeedStatus status) {
this.config = config;
this.status = status;
}
public Datafeed(StreamInput in) throws IOException {
this.config = new DatafeedConfig(in);
this.status = DatafeedStatus.fromStream(in);
}
public String getId() {
return config.getId();
}
public String getJobId() {
return config.getJobId();
}
public DatafeedConfig getConfig() {
return config;
}
public DatafeedStatus getStatus() {
return status;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
config.writeTo(out);
status.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CONFIG_FIELD.getPreferredName(), config);
builder.field(STATUS_FIELD.getPreferredName(), status);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Datafeed that = (Datafeed) o;
return Objects.equals(config, that.config) &&
Objects.equals(status, that.status);
}
@Override
public int hashCode() {
return Objects.hash(config, status);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@Override
public final String toString() {
return Strings.toString(this);
}
}

View File

@ -6,12 +6,13 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
@ -39,7 +40,10 @@ import java.util.Objects;
* used around integral types and booleans so they can take <code>null</code>
* values.
*/
public class DatafeedConfig extends ToXContentToBytes implements Writeable {
public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements ToXContent {
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
/**
* The field name used to specify aggregation fields in Elasticsearch
@ -309,6 +313,11 @@ public class DatafeedConfig extends ToXContentToBytes implements Writeable {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static class Builder {
private static final int DEFAULT_SCROLL_SIZE = 1000;

View File

@ -5,20 +5,16 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
@ -26,15 +22,12 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorF
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.time.Duration;
import java.util.Collections;
@ -62,12 +55,10 @@ public class DatafeedJobRunner extends AbstractComponent {
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
}
public void run(String datafeedId, long startTime, Long endTime, InternalStartDatafeedAction.DatafeedTask task,
public void run(String datafeedId, long startTime, Long endTime, StartDatafeedAction.DatafeedTask task,
Consumer<Exception> handler) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
validate(datafeedId, mlMetadata);
Datafeed datafeed = mlMetadata.getDatafeed(datafeedId);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L;
@ -88,43 +79,36 @@ public class DatafeedJobRunner extends AbstractComponent {
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
private void innerRun(Holder holder, long startTime, Long endTime) {
setJobDatafeedStatus(holder.datafeed.getId(), DatafeedStatus.STARTED, error -> {
if (error != null) {
holder.stop("unable_to_set_datafeed_status", error);
logger.info("Starting datafeed [{}] for job [{}]", holder.datafeed.getId(), holder.datafeed.getJobId());
holder.future = threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.datafeedJob.runLookBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop("general_lookback_failure", e);
return;
}
logger.info("Starting datafeed [{}] for job [{}]", holder.datafeed.getId(), holder.datafeed.getJobId());
holder.future = threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.datafeedJob.runLookBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (DatafeedJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (DatafeedJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop("general_lookback_failure", e);
return;
}
if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else {
holder.stop("no_realtime", null);
holder.problemTracker.finishReport();
}
});
if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else {
holder.stop("no_realtime", null);
holder.problemTracker.finishReport();
}
});
}
@ -160,37 +144,12 @@ public class DatafeedJobRunner extends AbstractComponent {
}
}
public static void validate(String datafeedId, MlMetadata mlMetadata) {
Datafeed datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
Allocation allocation = mlMetadata.getAllocations().get(datafeed.getJobId());
if (allocation.getStatus() != JobStatus.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job status [{}], but got [{}]",
RestStatus.CONFLICT, JobStatus.OPENED, allocation.getStatus());
}
DatafeedStatus status = datafeed.getStatus();
if (status != DatafeedStatus.STOPPED) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed status [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedStatus.STOPPED, status);
}
DatafeedJobValidator.validate(datafeed.getConfig(), job);
}
private Holder createJobDatafeed(Datafeed datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, InternalStartDatafeedAction.DatafeedTask task) {
private Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(datafeed, job);
Duration queryDelay = Duration.ofSeconds(datafeed.getConfig().getQueryDelay());
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed.getConfig(), job);
Duration queryDelay = Duration.ofSeconds(datafeed.getQueryDelay());
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(datafeed, datafeedJob, new ProblemTracker(() -> auditor), handler);
@ -231,8 +190,8 @@ public class DatafeedJobRunner extends AbstractComponent {
});
}
private static Duration getFrequencyOrDefault(Datafeed datafeed, Job job) {
Long frequency = datafeed.getConfig().getFrequency();
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
Long frequency = datafeed.getFrequency();
Long bucketSpan = job.getAnalysisConfig().getBucketSpan();
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan) : Duration.ofSeconds(frequency);
}
@ -241,36 +200,15 @@ public class DatafeedJobRunner extends AbstractComponent {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
private void setJobDatafeedStatus(String datafeedId, DatafeedStatus status, Consumer<Exception> handler) {
UpdateDatafeedStatusAction.Request request = new UpdateDatafeedStatusAction.Request(datafeedId, status);
client.execute(UpdateDatafeedStatusAction.INSTANCE, request, new ActionListener<UpdateDatafeedStatusAction.Response>() {
@Override
public void onResponse(UpdateDatafeedStatusAction.Response response) {
if (response.isAcknowledged()) {
logger.debug("successfully set datafeed [{}] status to [{}]", datafeedId, status);
} else {
logger.info("set datafeed [{}] status to [{}], but was not acknowledged", datafeedId, status);
}
handler.accept(null);
}
@Override
public void onFailure(Exception e) {
logger.error("could not set datafeed [" + datafeedId + "] status to [" + status + "]", e);
handler.accept(e);
}
});
}
public class Holder {
private final Datafeed datafeed;
private final DatafeedConfig datafeed;
private final DatafeedJob datafeedJob;
private final ProblemTracker problemTracker;
private final Consumer<Exception> handler;
volatile Future<?> future;
private Holder(Datafeed datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer<Exception> handler) {
private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer<Exception> handler) {
this.datafeed = datafeed;
this.datafeedJob = datafeedJob;
this.problemTracker = problemTracker;
@ -285,7 +223,7 @@ public class DatafeedJobRunner extends AbstractComponent {
logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
if (datafeedJob.stop()) {
FutureUtils.cancel(future);
setJobDatafeedStatus(datafeed.getId(), DatafeedStatus.STOPPED, error -> handler.accept(e));
handler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
} else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());

View File

@ -22,21 +22,19 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.IgnoreDowntime;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.util.Collections;
@ -295,27 +293,6 @@ public class JobManager extends AbstractComponent {
return buildNewClusterState(currentState, builder);
}
public void updateDatafeedStatus(UpdateDatafeedStatusAction.Request request,
ActionListener<UpdateDatafeedStatusAction.Response> actionListener) {
String datafeedId = request.getDatafeedId();
DatafeedStatus newStatus = request.getDatafeedStatus();
clusterService.submitStateUpdateTask("update-datafeed-status-" + datafeedId,
new AckedClusterStateUpdateTask<UpdateDatafeedStatusAction.Response>(request, actionListener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.updateDatafeedStatus(datafeedId, newStatus);
return buildNewClusterState(currentState, builder);
}
@Override
protected UpdateDatafeedStatusAction.Response newResponse(boolean acknowledged) {
return new UpdateDatafeedStatusAction.Response(acknowledged);
}
});
}
private Allocation getAllocation(ClusterState state, String jobId) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
Allocation allocation = mlMetadata.getAllocations().get(jobId);

View File

@ -15,22 +15,24 @@ import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.io.IOException;
import java.util.Collection;
@ -42,6 +44,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Predicate;
public class MlMetadata implements MetaData.Custom {
@ -59,15 +62,15 @@ public class MlMetadata implements MetaData.Custom {
static {
ML_METADATA_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.PARSER.apply(p, c).build(), JOBS_FIELD);
ML_METADATA_PARSER.declareObjectArray(Builder::putAllocations, Allocation.PARSER, ALLOCATIONS_FIELD);
ML_METADATA_PARSER.declareObjectArray(Builder::putDatafeeds, Datafeed.PARSER, DATAFEEDS_FIELD);
ML_METADATA_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
}
private final SortedMap<String, Job> jobs;
private final SortedMap<String, Allocation> allocations;
private final SortedMap<String, Datafeed> datafeeds;
private final SortedMap<String, DatafeedConfig> datafeeds;
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, Allocation> allocations,
SortedMap<String, Datafeed> datafeeds) {
SortedMap<String, DatafeedConfig> datafeeds) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.allocations = Collections.unmodifiableSortedMap(allocations);
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
@ -81,11 +84,11 @@ public class MlMetadata implements MetaData.Custom {
return allocations;
}
public SortedMap<String, Datafeed> getDatafeeds() {
public SortedMap<String, DatafeedConfig> getDatafeeds() {
return datafeeds;
}
public Datafeed getDatafeed(String datafeedId) {
public DatafeedConfig getDatafeed(String datafeedId) {
return datafeeds.get(datafeedId);
}
@ -120,9 +123,9 @@ public class MlMetadata implements MetaData.Custom {
}
this.allocations = allocations;
size = in.readVInt();
TreeMap<String, Datafeed> datafeeds = new TreeMap<>();
TreeMap<String, DatafeedConfig> datafeeds = new TreeMap<>();
for (int i = 0; i < size; i++) {
datafeeds.put(in.readString(), new Datafeed(in));
datafeeds.put(in.readString(), new DatafeedConfig(in));
}
this.datafeeds = datafeeds;
}
@ -163,7 +166,7 @@ public class MlMetadata implements MetaData.Custom {
final Diff<Map<String, Job>> jobs;
final Diff<Map<String, Allocation>> allocations;
final Diff<Map<String, Datafeed>> datafeeds;
final Diff<Map<String, DatafeedConfig>> datafeeds;
MlMetadataDiff(MlMetadata before, MlMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
@ -176,7 +179,7 @@ public class MlMetadata implements MetaData.Custom {
MlMetadataDiff::readJobDiffFrom);
this.allocations = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Allocation::new,
MlMetadataDiff::readAllocationDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Datafeed::new,
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
}
@ -184,7 +187,7 @@ public class MlMetadata implements MetaData.Custom {
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
TreeMap<String, Allocation> newAllocations = new TreeMap<>(allocations.apply(((MlMetadata) part).allocations));
TreeMap<String, Datafeed> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
return new MlMetadata(newJobs, newAllocations, newDatafeeds);
}
@ -208,8 +211,8 @@ public class MlMetadata implements MetaData.Custom {
return AbstractDiffable.readDiffFrom(Allocation::new, in);
}
static Diff<Datafeed> readSchedulerDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(Datafeed::new, in);
static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
}
}
@ -227,17 +230,7 @@ public class MlMetadata implements MetaData.Custom {
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
return Strings.toString(this);
}
@Override
@ -249,7 +242,7 @@ public class MlMetadata implements MetaData.Custom {
private TreeMap<String, Job> jobs;
private TreeMap<String, Allocation> allocations;
private TreeMap<String, Datafeed> datafeeds;
private TreeMap<String, DatafeedConfig> datafeeds;
public Builder() {
this.jobs = new TreeMap<>();
@ -285,7 +278,7 @@ public class MlMetadata implements MetaData.Custom {
throw new ResourceNotFoundException("job [" + jobId + "] does not exist");
}
Optional<Datafeed> datafeed = getDatafeedByJobId(jobId);
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
+ datafeed.get().getId() + "] refers to it");
@ -313,35 +306,38 @@ public class MlMetadata implements MetaData.Custom {
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
Optional<Datafeed> existingDatafeed = getDatafeedByJobId(jobId);
Optional<DatafeedConfig> existingDatafeed = getDatafeedByJobId(jobId);
if (existingDatafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("A datafeed [" + existingDatafeed.get().getId()
+ "] already exists for job [" + jobId + "]");
}
DatafeedJobValidator.validate(datafeedConfig, job);
return putDatafeed(new Datafeed(datafeedConfig, DatafeedStatus.STOPPED));
}
private Builder putDatafeed(Datafeed datafeed) {
datafeeds.put(datafeed.getId(), datafeed);
datafeeds.put(datafeedConfig.getId(), datafeedConfig);
return this;
}
public Builder removeDatafeed(String datafeedId) {
Datafeed datafeed = datafeeds.get(datafeedId);
public Builder removeDatafeed(String datafeedId, PersistentTasksInProgress persistentTasksInProgress) {
DatafeedConfig datafeed = datafeeds.get(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
if (datafeed.getStatus() != DatafeedStatus.STOPPED) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, datafeed.getStatus());
throw ExceptionsHelper.conflictStatusException(msg);
if (persistentTasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = t -> {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasksInProgress.entriesExist(StartDatafeedAction.NAME, predicate)) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
DatafeedStatus.STARTED);
throw ExceptionsHelper.conflictStatusException(msg);
}
}
datafeeds.remove(datafeedId);
return this;
}
private Optional<Datafeed> getDatafeedByJobId(String jobId) {
private Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}
@ -361,9 +357,9 @@ public class MlMetadata implements MetaData.Custom {
return this;
}
private Builder putDatafeeds(Collection<Datafeed> datafeeds) {
for (Datafeed datafeed : datafeeds) {
putDatafeed(datafeed);
private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
for (DatafeedConfig datafeed : datafeeds) {
this.datafeeds.put(datafeed.getId(), datafeed);
}
return this;
}
@ -395,7 +391,7 @@ public class MlMetadata implements MetaData.Custom {
// Cannot update the status to DELETING if there are datafeeds attached
if (jobStatus.equals(JobStatus.DELETING)) {
Optional<Datafeed> datafeed = getDatafeedByJobId(jobId);
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
+ datafeed.get().getId() + "] refers to it");
@ -440,33 +436,6 @@ public class MlMetadata implements MetaData.Custom {
allocations.put(jobId, builder.build());
return this;
}
public Builder updateDatafeedStatus(String datafeedId, DatafeedStatus newStatus) {
Datafeed datafeed = datafeeds.get(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
DatafeedStatus currentStatus = datafeed.getStatus();
switch (newStatus) {
case STARTED:
if (currentStatus != DatafeedStatus.STOPPED) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_START, datafeedId, newStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
case STOPPED:
if (currentStatus != DatafeedStatus.STARTED) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_STOP_IN_CURRENT_STATE, datafeedId, newStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}
break;
default:
throw new IllegalArgumentException("[" + datafeedId + "] requested invalid datafeed status [" + newStatus + "]");
}
datafeeds.put(datafeedId, new Datafeed(datafeed.getConfig(), newStatus));
return this;
}
}
}

View File

@ -8,17 +8,21 @@ package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import java.io.IOException;
@ -49,12 +53,19 @@ public class RestStartDatafeedAction extends BaseRestHandler {
}
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis);
jobDatafeedRequest.setEndTime(endTimeMillis);
TimeValue startTimeout = restRequest.paramAsTime(StartDatafeedAction.START_TIMEOUT.getPreferredName(),
TimeValue.timeValueSeconds(30));
jobDatafeedRequest.setStartTimeout(startTimeout);
}
return channel -> {
client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, new RestToXContentListener<>(channel));
client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest,
new RestBuilderListener<PersistentActionResponse>(channel) {
@Override
public RestResponse buildResponse(PersistentActionResponse r, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field("started", true);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
});
};
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -30,9 +29,6 @@ public class RestStopDatafeedAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
StopDatafeedAction.Request jobDatafeedRequest = new StopDatafeedAction.Request(
restRequest.param(DatafeedConfig.ID.getPreferredName()));
if (restRequest.hasParam("stop_timeout")) {
jobDatafeedRequest.setStopTimeout(TimeValue.parseTimeValue(restRequest.param("stop_timeout"), "stop_timeout"));
}
return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -1,88 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class DatafeedStatusObserver {
private static final Logger LOGGER = Loggers.getLogger(DatafeedStatusObserver.class);
private final ThreadPool threadPool;
private final ClusterService clusterService;
public DatafeedStatusObserver(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
}
public void waitForStatus(String datafeedId, TimeValue waitTimeout, DatafeedStatus expectedStatus, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
DatafeedPredicate datafeedPredicate = new DatafeedPredicate(datafeedId, expectedStatus);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
handler.accept(null);
}
@Override
public void onClusterServiceClose() {
Exception e = new IllegalArgumentException("Cluster service closed while waiting for datafeed status to change to ["
+ expectedStatus + "]");
handler.accept(new IllegalStateException(e));
}
@Override
public void onTimeout(TimeValue timeout) {
if (datafeedPredicate.test(clusterService.state())) {
handler.accept(null);
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed status to change to ["
+ expectedStatus + "]");
handler.accept(e);
}
}
}, datafeedPredicate, waitTimeout);
}
private static class DatafeedPredicate implements Predicate<ClusterState> {
private final String datafeedId;
private final DatafeedStatus expectedStatus;
DatafeedPredicate(String datafeedId, DatafeedStatus expectedStatus) {
this.datafeedId = datafeedId;
this.expectedStatus = expectedStatus;
}
@Override
public boolean test(ClusterState newState) {
MlMetadata metadata = newState.getMetaData().custom(MlMetadata.TYPE);
if (metadata != null) {
Datafeed datafeed = metadata.getDatafeed(datafeedId);
if (datafeed != null) {
return datafeed.getStatus() == expectedStatus;
}
}
return false;
}
}
}

View File

@ -17,8 +17,11 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* A cluster state record that contains a list of all running persistent tasks
@ -40,6 +43,19 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return this.entries;
}
public Collection<PersistentTaskInProgress<?>> findEntries(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.entries().stream()
.filter(p -> actionName.equals(p.getAction()))
.filter(predicate)
.collect(Collectors.toList());
}
public boolean entriesExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.entries().stream()
.filter(p -> actionName.equals(p.getAction()))
.anyMatch(predicate);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -99,7 +99,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
}
public static class Response extends AcknowledgedResponse {
protected Response() {
public Response() {
super();
}

View File

@ -22,7 +22,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -33,6 +32,8 @@ import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.After;
import java.io.IOException;
@ -99,17 +100,16 @@ public class DatafeedJobsIT extends ESIntegTestCase {
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
startDatafeedRequest.setEndTime(now);
StartDatafeedAction.Response startDatafeedResponse =
PersistentActionResponse startDatafeedResponse =
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertTrue(startDatafeedResponse.isStarted());
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(MlMetadata.TYPE);
assertThat(mlMetadata.getDatafeed(datafeedConfig.getId()).get().getStatus(), equalTo(DatafeedStatus.STOPPED));
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED));
});
}
@ -139,9 +139,8 @@ public class DatafeedJobsIT extends ESIntegTestCase {
assertTrue(putDatafeedResponse.isAcknowledged());
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
StartDatafeedAction.Response startDatafeedResponse =
PersistentActionResponse startDatafeedResponse =
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertTrue(startDatafeedResponse.isStarted());
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
@ -159,7 +158,7 @@ public class DatafeedJobsIT extends ESIntegTestCase {
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId());
try {
StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get();
RemovePersistentTaskAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get();
assertTrue(stopJobResponse.isAcknowledged());
} catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
@ -170,9 +169,9 @@ public class DatafeedJobsIT extends ESIntegTestCase {
throw e;
}
assertBusy(() -> {
MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(MlMetadata.TYPE);
assertThat(mlMetadata.getDatafeed(datafeedConfig.getId()).get().getStatus(), equalTo(DatafeedStatus.STOPPED));
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedStatus(), equalTo(DatafeedStatus.STOPPED));
});
}
@ -240,10 +239,10 @@ public class DatafeedJobsIT extends ESIntegTestCase {
private static void deleteAllDatafeeds(Client client) throws Exception {
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
for (Datafeed datafeed : mlMetadata.getDatafeeds().values()) {
for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
String datafeedId = datafeed.getId();
try {
StopDatafeedAction.Response stopResponse =
RemovePersistentTaskAction.Response stopResponse =
client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get();
assertTrue(stopResponse.isAcknowledged());
} catch (ExecutionException e) {

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
@ -30,7 +30,7 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
datafeedStatsList.add(datafeedStats);
}
result = new Response(new QueryPage<>(datafeedStatsList, datafeedStatsList.size(), Datafeed.RESULTS_FIELD));
result = new Response(new QueryPage<>(datafeedStatsList, datafeedStatsList.size(), DatafeedConfig.RESULTS_FIELD));
return result;
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.action.GetDatafeedsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.Datafeed;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
@ -61,7 +60,7 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase<
datafeedList.add(datafeedConfig.build());
}
result = new Response(new QueryPage<>(datafeedList, datafeedList.size(), Datafeed.RESULTS_FIELD));
result = new Response(new QueryPage<>(datafeedList, datafeedList.size(), DatafeedConfig.RESULTS_FIELD));
return result;
}

View File

@ -5,11 +5,18 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<StartDatafeedAction.Request> {
@Override
@ -18,9 +25,6 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
if (randomBoolean()) {
request.setEndTime(randomNonNegativeLong());
}
if (randomBoolean()) {
request.setStartTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
return request;
}
@ -34,4 +38,22 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
return Request.parseRequest(null, parser);
}
public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
.build();
e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job status [OPENED], but got [CLOSED]"));
}
}

View File

@ -5,12 +5,9 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
@ -24,9 +21,6 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase<S
@Override
protected Request createTestInstance() {
Request r = new Request(randomAsciiOfLengthBetween(1, 20));
if (randomBoolean()) {
r.setStopTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
return r;
}
@ -38,21 +32,15 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase<S
public void testValidate() {
Job job = createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
Exception e = expectThrows(ResourceNotFoundException.class, () -> StopDatafeedAction.validate("foo", mlMetadata1));
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StopDatafeedAction.validate("foo", mlMetadata1));
assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists"));
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
e = expectThrows(ElasticsearchStatusException.class, () -> StopDatafeedAction.validate("foo", mlMetadata2));
assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed status [STARTED], but got [STOPPED]"));
MlMetadata mlMetadata3 = new MlMetadata.Builder().putJob(job, false)
.putDatafeed(datafeedConfig)
.updateDatafeedStatus("foo", DatafeedStatus.STARTED)
.build();
StopDatafeedAction.validate("foo", mlMetadata3);
StopDatafeedAction.validate("foo", mlMetadata2);
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
public class UpdateDatafeedStatusRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLengthBetween(1, 20), randomFrom(DatafeedStatus.values()));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -5,10 +5,8 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -20,20 +18,19 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import java.io.ByteArrayInputStream;
@ -45,9 +42,6 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.INSTANCE;
import static org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction.Request;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.same;
@ -77,12 +71,6 @@ public class DatafeedJobRunnerTests extends ESTestCase {
jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
clusterService = mock(ClusterService.class);
doAnswer(invocation -> {
@SuppressWarnings("rawtypes")
ActionListener<Object> actionListener = (ActionListener) invocation.getArguments()[2];
actionListener.onResponse(new UpdateDatafeedStatusAction.Response());
return null;
}).when(client).execute(same(UpdateDatafeedStatusAction.INSTANCE), any(), any());
JobProvider jobProvider = mock(JobProvider.class);
Mockito.doAnswer(invocationOnMock -> {
@ -141,15 +129,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenReturn(Optional.of(in));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
InternalStartDatafeedAction.DatafeedTask task = mock(InternalStartDatafeedAction.DatafeedTask.class);
StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class);
datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any());
}
private static PostDataAction.Request createExpectedPostDataRequest(Job job) {
@ -181,15 +167,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
InternalStartDatafeedAction.DatafeedTask task = mock(InternalStartDatafeedAction.DatafeedTask.class);
StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class);
datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any());
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
@ -214,14 +198,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
InternalStartDatafeedAction.DatafeedTask task =
new InternalStartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1");
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1");
datafeedJobRunner.run("datafeed1", 0L, null, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME);
if (cancelled) {
task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("datafeed1", DatafeedStatus.STOPPED)), any());
verify(handler).accept(null);
} else {
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
@ -246,32 +229,6 @@ public class DatafeedJobRunnerTests extends ESTestCase {
return builder;
}
public void testValidate() {
Job job1 = createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> DatafeedJobRunner.validate("some-datafeed", mlMetadata1));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
DatafeedConfig datafeedConfig1 = createDatafeedConfig("foo-datafeed", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
.build();
e = expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobRunner.validate("foo-datafeed", mlMetadata2));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job status [OPENED], but got [CLOSED]"));
MlMetadata mlMetadata3 = new MlMetadata.Builder(mlMetadata2)
.updateStatus("foo", JobStatus.OPENED, null)
.updateDatafeedStatus("foo-datafeed", DatafeedStatus.STARTED)
.build();
e = expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobRunner.validate("foo-datafeed", mlMetadata3));
assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed status [STOPPED], but got [STARTED]"));
}
@SuppressWarnings("unchecked")
private Consumer<Exception> mockConsumer() {
return mock(Consumer.class);

View File

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
public class DatafeedTests extends AbstractSerializingTestCase<Datafeed> {
@Override
protected Datafeed createTestInstance() {
return new Datafeed(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAsciiOfLength(10)),
randomFrom(DatafeedStatus.values()));
}
@Override
protected Writeable.Reader<Datafeed> instanceReader() {
return Datafeed::new;
}
@Override
protected Datafeed parseInstance(XContentParser parser) {
return Datafeed.PARSER.apply(parser, null);
}
}

View File

@ -40,8 +40,7 @@ public class MlRestTestStateCleaner {
}
for (Map<String, Object> datafeed : datafeeds) {
Map<String, Object> datafeedMap = (Map<String, Object>) datafeed.get("config");
String datafeedId = (String) datafeedMap.get("datafeed_id");
String datafeedId = (String) datafeed.get("datafeed_id");
try {
client.performRequest("POST", "/_xpack/ml/datafeeds/" + datafeedId + "/_stop");
} catch (Exception e) {

View File

@ -12,19 +12,23 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.DatafeedJobsIT;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.DatafeedJobsIT;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.junit.After;
import java.io.IOException;
@ -127,7 +131,13 @@ public class TooManyJobsIT extends ESIntegTestCase {
public static void ensureClusterStateConsistencyWorkAround() throws IOException {
if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> namedWritables = new ArrayList<>(ClusterModule.getNamedWriteables());
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
namedWritables.addAll(searchModule.getNamedWriteables());
namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
namedWritables.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE,
PersistentTasksInProgress::new));
namedWritables.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME,
StartDatafeedAction.Request::new));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWritables);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);

View File

@ -15,20 +15,22 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobStatus;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedStatus;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.Collections;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -53,9 +55,6 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
builder.putJob(job, false);
builder.putDatafeed(datafeedConfig);
if (randomBoolean()) {
builder.updateDatafeedStatus(datafeedConfig.getId(), DatafeedStatus.STARTED);
}
} else {
builder.putJob(job, false);
}
@ -74,7 +73,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
@Override
protected Writeable.Reader<MlMetadata> instanceReader() {
return in -> new MlMetadata(in);
return MlMetadata::new;
}
@Override
@ -194,11 +193,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED));
assertThat(result.getDatafeeds().get("datafeed1").getConfig(), sameInstance(datafeedConfig1));
assertThat(result.getDatafeeds().get("datafeed1").getStatus(), equalTo(DatafeedStatus.STOPPED));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
builder.removeDatafeed("datafeed1");
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyList()));
result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.CLOSED));
@ -255,16 +253,20 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putDatafeed(datafeedConfig1);
builder.updateStatus("foo", JobStatus.OPENING, null);
builder.updateStatus("foo", JobStatus.OPENED, null);
builder.updateDatafeedStatus("datafeed1", DatafeedStatus.STARTED);
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getStatus(), equalTo(JobStatus.OPENED));
assertThat(result.getDatafeeds().get("datafeed1").getConfig(), sameInstance(datafeedConfig1));
assertThat(result.getDatafeeds().get("datafeed1").getStatus(), equalTo(DatafeedStatus.STARTED));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTasksInProgress.PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTasksInProgress.PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null);
PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonList(taskInProgress));
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.removeDatafeed("datafeed1"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.removeDatafeed("datafeed1", tasksInProgress));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}

View File

@ -21,10 +21,6 @@
"type": "string",
"required": false,
"description": "The end time when the datafeed should stop. When not set, the datafeed continues in real time"
},
"start_timeout": {
"type": "time",
"description": "Controls the time to wait until a datafeed has started. Default to 30 seconds"
}
}
},

View File

@ -13,10 +13,6 @@
"type": "string",
"required": true,
"description": "The ID of the datafeed to stop"
},
"stop_timeout": {
"type": "time",
"description": "Controls the time to wait until a datafeed has stopped. Default to 30 seconds"
}
},
"body": null