diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index c45076c41d5..f7cf22ad8df 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -300,7 +300,7 @@ public class OpenJobAction extends Action task = MlMetadata.getTask(jobId, tasks); + PersistentTaskInProgress task = MlMetadata.getJobTask(jobId, tasks); JobState jobState = MlMetadata.getJobState(jobId, tasks); if (task != null && task.getExecutorNode() != null && jobState == JobState.OPENED) { if (nodes.nodeExists(task.getExecutorNode()) == false) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 756980d4b7e..167efe6fdbe 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -14,11 +15,15 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,6 +41,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; @@ -43,17 +49,18 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; import java.util.Objects; -import java.util.function.Predicate; public class StartDatafeedAction extends Action { public static final ParseField START_TIME = new ParseField("start"); public static final ParseField END_TIME = new ParseField("end"); + public static final ParseField TIMEOUT = new ParseField("timeout"); public static final StartDatafeedAction INSTANCE = new StartDatafeedAction(); public static final String NAME = "cluster:admin/ml/datafeeds/start"; @@ -80,6 +87,8 @@ public class StartDatafeedAction PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); PARSER.declareLong(Request::setEndTime, END_TIME); + PARSER.declareString((request, val) -> + request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); } public static Request parseRequest(String datafeedId, XContentParser parser) { @@ -93,6 +102,7 @@ public class StartDatafeedAction private String datafeedId; private long startTime; private Long endTime; + private TimeValue timeout = TimeValue.timeValueSeconds(20); public Request(String datafeedId, long startTime) { this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); @@ -122,6 +132,14 @@ public class StartDatafeedAction this.endTime = endTime; } + public TimeValue getTimeout() { + return timeout; + } + + public void setTimeout(TimeValue timeout) { + this.timeout = timeout; + } + @Override public ActionRequestValidationException validate() { return null; @@ -138,6 +156,7 @@ public class StartDatafeedAction datafeedId = in.readString(); startTime = in.readVLong(); endTime = in.readOptionalLong(); + timeout = TimeValue.timeValueMillis(in.readVLong()); } @Override @@ -146,6 +165,7 @@ public class StartDatafeedAction out.writeString(datafeedId); out.writeVLong(startTime); out.writeOptionalLong(endTime); + out.writeVLong(timeout.millis()); } @Override @@ -161,13 +181,14 @@ public class StartDatafeedAction if (endTime != null) { builder.field(END_TIME.getPreferredName(), endTime); } + builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep()); builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime); + return Objects.hash(datafeedId, startTime, endTime, timeout); } @Override @@ -181,7 +202,8 @@ public class StartDatafeedAction Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(startTime, other.startTime) && - Objects.equals(endTime, other.endTime); + Objects.equals(endTime, other.endTime) && + Objects.equals(timeout, other.timeout); } } @@ -220,41 +242,59 @@ public class StartDatafeedAction public static class TransportAction extends TransportPersistentAction { + private final DatafeedStateObserver observer; + private final ClusterService clusterService; private final DatafeedJobRunner datafeedJobRunner; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - DatafeedJobRunner datafeedJobRunner) { + ClusterService clusterService, DatafeedJobRunner datafeedJobRunner) { super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); + this.clusterService = clusterService; this.datafeedJobRunner = datafeedJobRunner; + this.observer = new DatafeedStateObserver(threadPool, clusterService); + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + ClusterState state = clusterService.state(); + StartDatafeedAction.validate(request.datafeedId, state.metaData().custom(MlMetadata.TYPE), + state.custom(PersistentTasksInProgress.TYPE), true); + ActionListener finalListener = + ActionListener.wrap(response -> waitForDatafeedStarted(request, response, listener), listener::onFailure); + super.doExecute(request, finalListener); + } + + void waitForDatafeedStarted(Request request, + PersistentActionResponse response, + ActionListener listener) { + observer.waitForState(request.getDatafeedId(), request.timeout, DatafeedState.STARTED, e -> { + if (e != null) { + listener.onFailure(e); + } else { + listener.onResponse(response); + } + }); + } + + @Override + public DiscoveryNode executorNode(Request request, ClusterState clusterState) { + return selectNode(logger, request, clusterState); } @Override public void validate(Request request, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); - StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks); - PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE); - if (persistentTasksInProgress == null) { - return; - } - - Predicate> predicate = taskInProgress -> { - Request storedRequest = (Request) taskInProgress.getRequest(); - return storedRequest.getDatafeedId().equals(request.getDatafeedId()); - }; - if (persistentTasksInProgress.tasksExist(NAME, predicate)) { - throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED); - } + StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, false); } @Override - protected void nodeOperation(PersistentTask task, Request request, ActionListener listener) { - DatafeedTask datafeedTask = (DatafeedTask) task; + protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener listener) { + DatafeedTask datafeedTask = (DatafeedTask) persistentTask; datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), datafeedTask, (error) -> { @@ -268,7 +308,8 @@ public class StartDatafeedAction } - public static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) { + static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, + boolean checkJobState) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); @@ -277,11 +318,46 @@ public class StartDatafeedAction if (job == null) { throw ExceptionsHelper.missingJobException(datafeed.getJobId()); } - JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); - if (jobState != JobState.OPENED) { - throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", - RestStatus.CONFLICT, JobState.OPENED, jobState); + if (tasks == null) { + return; + } + + // we only check job state when the user submits the start datafeed request, + // but when we persistent task framework validates the request we don't, + // because we don't know if the job task will be started before datafeed task, + // so we just wait until the job task is opened, which will happen at some point in time. + if (checkJobState) { + JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); + if (jobState != JobState.OPENED) { + throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", + RestStatus.CONFLICT, JobState.OPENED, jobState); + } + } + + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); + if (datafeedState == DatafeedState.STARTED) { + throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]", + RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED); } DatafeedJobValidator.validate(datafeed, job); } + + static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) { + MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); + PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); + DiscoveryNodes nodes = clusterState.getNodes(); + + JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); + if (jobState == JobState.OPENED) { + PersistentTaskInProgress task = MlMetadata.getJobTask(datafeed.getJobId(), tasks); + return nodes.get(task.getExecutorNode()); + } else { + // lets try again later when the job has been opened: + logger.debug("cannot start datafeeder, because job's [{}] state is [{}] while state [{}] is required", + datafeed.getJobId(), jobState, JobState.OPENED); + return null; + } + } + } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 6379c91b2ff..48a36f2cb6f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -147,19 +147,15 @@ public class StopDatafeedAction validate(datafeedId, mlMetadata); PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); - if (tasks != null) { - for (PersistentTaskInProgress task : tasks.findTasks(StartDatafeedAction.NAME, p -> true)) { - StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) task.getRequest(); - if (storedRequest.getDatafeedId().equals(datafeedId)) { - RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request(); - removeTaskRequest.setTaskId(task.getId()); - removePersistentTaskAction.execute(removeTaskRequest, listener); - return; - } - } + PersistentTaskInProgress task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); + if (task != null) { + RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request(); + removeTaskRequest.setTaskId(task.getId()); + removePersistentTaskAction.execute(removeTaskRequest, listener); + } else { + listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", + RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED)); } - listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED)); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 06e903a39b6..8de5d9da817 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -6,11 +6,13 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.threadpool.ThreadPool; @@ -30,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import java.time.Duration; import java.util.Collections; @@ -75,7 +78,11 @@ public class DatafeedJobRunner extends AbstractComponent { latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task); - innerRun(holder, startTime, endTime); + UpdatePersistentTaskStatusAction.Request updateDatafeedStatus = + new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED); + client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> { + innerRun(holder, startTime, endTime); + }, handler)); }, handler); } @@ -235,13 +242,29 @@ public class DatafeedJobRunner extends AbstractComponent { public void stop(String source, Exception e) { logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); - if (datafeedJob.stop()) { - FutureUtils.cancel(future); - handler.accept(e); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); - } else { - logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); - } + // We need to fork, because: + // 1) We are being called from cluster state update thread and we should return as soon as possible + // 2) We also index into the notifaction index and that is forbidden from the cluster state update thread: + // (Caused by: java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied + // cluster state is not yet available]) + threadPool.executor(ThreadPool.Names.GENERIC).submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("failed to stop [{}] datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); + handler.accept(e); + } + + @Override + protected void doRun() throws Exception { + if (datafeedJob.stop()) { + FutureUtils.cancel(future); + handler.accept(e); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); + } else { + logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); + } + } + }); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java index 3592285f7b8..6ea1b2a8025 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java @@ -337,7 +337,9 @@ public class MlMetadata implements MetaData.Custom { } - public static PersistentTasksInProgress.PersistentTaskInProgress getTask(String jobId, @Nullable PersistentTasksInProgress tasks) { + @Nullable + public static PersistentTasksInProgress.PersistentTaskInProgress getJobTask(String jobId, + @Nullable PersistentTasksInProgress tasks) { if (tasks != null) { Predicate> p = t -> { OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); @@ -350,8 +352,23 @@ public class MlMetadata implements MetaData.Custom { return null; } + @Nullable + public static PersistentTasksInProgress.PersistentTaskInProgress getDatafeedTask(String datafeedId, + @Nullable PersistentTasksInProgress tasks) { + if (tasks != null) { + Predicate> p = t -> { + StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); + return storedRequest.getDatafeedId().equals(datafeedId); + }; + for (PersistentTasksInProgress.PersistentTaskInProgress task : tasks.findTasks(StartDatafeedAction.NAME, p)) { + return task; + } + } + return null; + } + public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) { - PersistentTasksInProgress.PersistentTaskInProgress task = getTask(jobId, tasks); + PersistentTasksInProgress.PersistentTaskInProgress task = getJobTask(jobId, tasks); if (task != null && task.getStatus() != null) { return (JobState) task.getStatus(); } else { @@ -360,4 +377,15 @@ public class MlMetadata implements MetaData.Custom { } } + public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) { + PersistentTasksInProgress.PersistentTaskInProgress task = getDatafeedTask(datafeedId, tasks); + if (task != null && task.getStatus() != null) { + return (DatafeedState) task.getStatus(); + } else { + // If we haven't started a datafeed then there will be no persistent task, + // which is the same as if the datafeed was't started + return DatafeedState.STOPPED; + } + } + } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java index 6db1572f955..32a56702aae 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.rest.datafeeds; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -53,6 +54,10 @@ public class RestStartDatafeedAction extends BaseRestHandler { } jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis); jobDatafeedRequest.setEndTime(endTimeMillis); + if (restRequest.hasParam("timeout")) { + TimeValue openTimeout = restRequest.paramAsTime("timeout", TimeValue.timeValueSeconds(20)); + jobDatafeedRequest.setTimeout(openTimeout); + } } return channel -> { client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java new file mode 100644 index 00000000000..e925e55ea12 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; + +import java.util.function.Consumer; +import java.util.function.Predicate; + +public class DatafeedStateObserver { + + private static final Logger LOGGER = Loggers.getLogger(DatafeedStateObserver.class); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + + public DatafeedStateObserver(ThreadPool threadPool, ClusterService clusterService) { + this.threadPool = threadPool; + this.clusterService = clusterService; + } + + public void waitForState(String datafeedId, TimeValue waitTimeout, DatafeedState expectedState, Consumer handler) { + ClusterStateObserver observer = + new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext()); + Predicate predicate = (newState) -> { + PersistentTasksInProgress tasks = newState.custom(PersistentTasksInProgress.TYPE); + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); + return datafeedState == expectedState; + }; + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + handler.accept(null); + } + + @Override + public void onClusterServiceClose() { + Exception e = new IllegalArgumentException("Cluster service closed while waiting for datafeed state to change to [" + + expectedState + "]"); + handler.accept(new IllegalStateException(e)); + } + + @Override + public void onTimeout(TimeValue timeout) { + if (predicate.test(clusterService.state())) { + handler.accept(null); + } else { + Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed state to change to [" + + expectedState + "]"); + handler.accept(e); + } + } + }, predicate, waitTimeout); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index be3d8e1b8d1..8a8697b820e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -17,9 +17,6 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; @@ -32,7 +29,6 @@ import org.junit.Before; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; @@ -176,31 +172,6 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase { logger.info("Indexed [{}] documents", numDocs); } - private Job.Builder createScheduledJob(String jobId) { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.JSON); - dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); - - Detector.Builder d = new Detector.Builder("count", null); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); - - Job.Builder builder = new Job.Builder(); - builder.setId(jobId); - - builder.setAnalysisConfig(analysisConfig); - builder.setDataDescription(dataDescription); - return builder; - } - - private DatafeedConfig createDatafeed(String datafeedId, String jobId, List indexes) { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); - builder.setQueryDelay(1); - builder.setFrequency(2); - builder.setIndexes(indexes); - builder.setTypes(Collections.singletonList("type")); - return builder.build(); - } - private DataCounts getDataCounts(String jobId) { GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java index fecb7353d3a..f510b55708c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java @@ -5,21 +5,9 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; - -import java.util.Collections; - -import static org.hamcrest.Matchers.equalTo; public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase { @@ -42,26 +30,4 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT return Request.parseRequest(null, parser); } - public void testValidate() { - Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); - Exception e = expectThrows(ResourceNotFoundException.class, - () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, - new PersistentTasksInProgress(0L, Collections.emptyMap()))); - assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); - - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), null); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); - DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1) - .build(); - e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); - assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]")); - } - } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java new file mode 100644 index 00000000000..23b76975c47 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; + +import java.net.InetAddress; +import java.util.Collections; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.hamcrest.Matchers.equalTo; + +public class StartDatafeedActionTests extends ESTestCase { + + public void testSelectNode() throws Exception { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(); + mlMetadata.putJob(job, false); + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); + + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), "node_id"); + task = new PersistentTaskInProgress(task, randomFrom(JobState.FAILED, JobState.CLOSED, + JobState.CLOSING, JobState.OPENING)); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) + .putCustom(PersistentTasksInProgress.TYPE, tasks) + .nodes(nodes); + + StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed_id", 0L); + DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build()); + assertNull(node); + + task = new PersistentTaskInProgress(task, JobState.OPENED); + tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) + .putCustom(PersistentTasksInProgress.TYPE, tasks) + .nodes(nodes); + node = StartDatafeedAction.selectNode(logger, request, cs.build()); + assertEquals("node_id", node.getId()); + } + + public void testValidate() { + Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .build(); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, false)); + assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); + } + + public void testValidate_jobClosed() { + Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .build(); + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), null); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); + DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); + MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) + .putDatafeed(datafeedConfig1) + .build(); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, true)); + assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]")); + } + + + + public void testValidate_dataFeedAlreadyStarted() { + Job job1 = createScheduledJob("job_id").build(); + DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .putDatafeed(datafeedConfig) + .build(); + + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + "node_id"); + task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, false)); + assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]")); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index d837babaf3a..2145af49676 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -34,6 +35,8 @@ import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response; import org.junit.Before; import java.io.ByteArrayInputStream; @@ -93,6 +96,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { return null; }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); @@ -110,6 +114,13 @@ public class DatafeedJobRunnerTests extends ESTestCase { consumer.accept(new ResourceNotFoundException("dummy")); return null; }).when(jobProvider).buckets(any(), any(), any(), any()); + + doAnswer(invocationOnMock -> { + @SuppressWarnings("rawtypes") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new Response(true)); + return null; + }).when(client).execute(same(UpdatePersistentTaskStatusAction.INSTANCE), any(), any()); } public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 431ede72c2c..dbd6696bb8a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -5,13 +5,20 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; +import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import java.util.Collections; + public class BasicDistributedJobsIT extends BaseMlIntegTestCase { public void testFailOverBasics() throws Exception { @@ -51,4 +58,66 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { cleanupWorkaround(2); } + public void testFailOverBasics_withDataFeeder() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(4); + ensureStableCluster(4); + + Job.Builder job = createScheduledJob("job_id"); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("*")); + PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); + PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get(); + assertTrue(putDatadeedResponse.isAcknowledged()); + + ensureGreen(); + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); + client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest); + assertBusy(() -> { + GetDatafeedsStatsAction.Response statsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet(); + assertEquals(1, statsResponse.getResponse().results().size()); + assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState()); + }); + + internalCluster().stopRandomDataNode(); + ensureStableCluster(3); + ensureGreen(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + assertBusy(() -> { + GetDatafeedsStatsAction.Response statsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet(); + assertEquals(1, statsResponse.getResponse().results().size()); + assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState()); + }); + + internalCluster().stopRandomDataNode(); + ensureStableCluster(2); + ensureGreen(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + assertBusy(() -> { + GetDatafeedsStatsAction.Response statsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet(); + assertEquals(1, statsResponse.getResponse().results().size()); + assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState()); + }); + cleanupWorkaround(2); + } + } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 747379838e1..78ec4b44955 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; @@ -24,10 +25,12 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -72,6 +75,31 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { return builder; } + public static Job.Builder createScheduledJob(String jobId) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List indexes) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); + builder.setQueryDelay(1); + builder.setFrequency(2); + builder.setIndexes(indexes); + builder.setTypes(Collections.singletonList("type")); + return builder.build(); + } + // Due to the fact that ml plugin creates the state, notifications and meta indices automatically // when the test framework removes all indices then ml plugin adds them back. Causing validation to fail // we should move to templates instead as that will fix the test problem @@ -129,6 +157,11 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { throw new RuntimeException(e); } } + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet(); + assertEquals(JobState.CLOSED, statsResponse.getResponse().results().get(0).getState()); + }); DeleteJobAction.Response response = client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get(); assertTrue(response.isAcknowledged()); diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json index 3d1d00293c2..c2f497a2a2a 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json @@ -21,6 +21,10 @@ "type": "string", "required": false, "description": "The end time when the datafeed should stop. When not set, the datafeed continues in real time" + }, + "timeout": { + "type": "time", + "description": "Controls the time to wait until a datafeed has started. Default to 20 seconds" } } },