From 6783f823a86aa7c776dfea18cdba4a3e1671ef58 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Feb 2017 17:47:38 +0100 Subject: [PATCH] [ML] Change stop datafeed api delegate to node hosting datafeed task and execute cancel locally, instead of only removing the persistent task from cluster state. Original commit: elastic/x-pack-elasticsearch@3974b20827f6d0ab93bae73f489d2712ecbbfa36 --- .../xpack/ml/action/StopDatafeedAction.java | 190 +++++++++++++----- .../ml/action/TransportJobTaskAction.java | 20 +- .../ml/client/MachineLearningClient.java | 3 +- .../xpack/ml/datafeed/DatafeedJobRunner.java | 42 ++-- .../datafeeds/RestStopDatafeedAction.java | 35 +++- .../MachineLearningLicensingTests.java | 3 +- .../xpack/ml/action/DatafeedJobsIT.java | 5 +- .../ml/action/OpenJobActionRequestTests.java | 3 + .../StartDatafeedActionRequestTests.java | 4 + .../StopDatafeedActionRequestTests.java | 56 +++++- .../xpack/ml/integration/DatafeedJobIT.java | 2 +- .../xpack/ml/support/BaseMlIntegTestCase.java | 5 +- .../api/xpack.ml.stop_datafeed.json | 5 + .../ml/integration/MlBasicMultiNodeIT.java | 2 +- 14 files changed, 268 insertions(+), 107 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 55ba827d586..66c9cee2b6d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -11,39 +11,51 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; +import java.util.List; import java.util.Objects; public class StopDatafeedAction - extends Action { + extends Action { public static final StopDatafeedAction INSTANCE = new StopDatafeedAction(); public static final String NAME = "cluster:admin/ml/datafeeds/stop"; + public static final ParseField TIMEOUT = new ParseField("timeout"); private StopDatafeedAction() { super(NAME); @@ -55,16 +67,37 @@ public class StopDatafeedAction } @Override - public RemovePersistentTaskAction.Response newResponse() { - return new RemovePersistentTaskAction.Response(); + public Response newResponse() { + return new Response(); } - public static class Request extends MasterNodeRequest { + public static class Request extends BaseTasksRequest implements ToXContent { + + public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); + PARSER.declareString((request, val) -> + request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + } + + public static Request fromXContent(XContentParser parser) { + return parseRequest(null, parser); + } + + public static Request parseRequest(String datafeedId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (datafeedId != null) { + request.datafeedId = datafeedId; + } + return request; + } private String datafeedId; public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); + setActions(StartDatafeedAction.NAME); } Request() { @@ -74,6 +107,12 @@ public class StopDatafeedAction return datafeedId; } + @Override + public boolean match(Task task) { + String expectedDescription = "datafeed-" + datafeedId; + return task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription()); + } + @Override public ActionRequestValidationException validate() { return null; @@ -93,7 +132,18 @@ public class StopDatafeedAction @Override public int hashCode() { - return Objects.hash(datafeedId); + return Objects.hash(datafeedId, getTimeout()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId); + if (getTimeout() != null) { + builder.field(TIMEOUT.getPreferredName(), getTimeout().getStringRep()); + } + builder.endObject(); + return builder; } @Override @@ -105,71 +155,121 @@ public class StopDatafeedAction return false; } Request other = (Request) obj; - return Objects.equals(datafeedId, other.datafeedId); + return Objects.equals(datafeedId, other.datafeedId) && + Objects.equals(getTimeout(), other.getTimeout()); } } - static class RequestBuilder extends ActionRequestBuilder { + public static class Response extends BaseTasksResponse implements Writeable { + + private boolean stopped; + + public Response(boolean stopped) { + super(null, null); + this.stopped = stopped; + } + + public Response(StreamInput in) throws IOException { + readFrom(in); + } + + public Response() { + } + + public boolean isStopped() { + return stopped; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + stopped = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(stopped); + } + } + + static class RequestBuilder extends ActionRequestBuilder { RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) { super(client, action, new Request()); } } - public static class TransportAction extends TransportMasterNodeAction { - - private final RemovePersistentTaskAction.TransportAction removePersistentTaskAction; + public static class TransportAction extends TransportTasksAction { @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, RemovePersistentTaskAction.TransportAction removePersistentTaskAction) { - super(settings, StopDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); - this.removePersistentTaskAction = removePersistentTaskAction; + ClusterService clusterService) { + super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); } @Override - protected String executor() { - return ThreadPool.Names.SAME; + protected void doExecute(Task task, Request request, ActionListener listener) { + ClusterState state = clusterService.state(); + MetaData metaData = state.metaData(); + MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); + PersistentTasksInProgress tasks = metaData.custom(PersistentTasksInProgress.TYPE); + String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); + request.setNodes(nodeId); + ActionListener finalListener = + ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure); + super.doExecute(task, request, finalListener); + } + + // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed + // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, + // so wait for that to happen here. + void waitForDatafeedStopped(Request request, Response response, ActionListener listener) { + DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService); + observer.waitForState(request.getDatafeedId(), request.getTimeout(), DatafeedState.STOPPED, e -> { + if (e != null) { + listener.onFailure(e); + } else { + listener.onResponse(response); + } + }); } @Override - protected RemovePersistentTaskAction.Response newResponse() { - return new RemovePersistentTaskAction.Response(); + protected Response newResponse(Request request, List tasks, List taskOperationFailures, + List failedNodeExceptions) { + return TransportJobTaskAction.selectFirst(tasks, taskOperationFailures, failedNodeExceptions); } @Override - protected void masterOperation(Request request, ClusterState state, - ActionListener listener) throws Exception { - String datafeedId = request.getDatafeedId(); - MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); - validate(datafeedId, mlMetadata); - - PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTaskInProgress task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); - if (task != null) { - RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request(); - removeTaskRequest.setTaskId(task.getId()); - removePersistentTaskAction.execute(removeTaskRequest, listener); - } else { - listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED)); - } + protected Response readTaskResponse(StreamInput in) throws IOException { + return new Response(in); } @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - // Remove persistent action actually updates cs, here we just read it. - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener listener) { + task.stop(); + listener.onResponse(new Response(true)); } + @Override + protected boolean accumulateExceptions() { + return true; + } } - static void validate(String datafeedId, MlMetadata mlMetadata) { + static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); } + PersistentTasksInProgress.PersistentTaskInProgress task = MlMetadata.getDatafeedTask(datafeedId, tasks); + if (task == null || task.getStatus() != DatafeedState.STARTED) { + throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", + RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED); + } + return task.getExecutorNode(); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index d372c1d0ae7..e5a3ce90d69 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -93,14 +92,21 @@ public abstract class TransportJobTaskAction tasks, List taskOperationFailures, List failedNodeExceptions) { + return selectFirst(tasks, taskOperationFailures, failedNodeExceptions); + + } + + static Response selectFirst(List tasks, + List taskOperationFailures, + List failedNodeExceptions) { // no need to accumulate sub responses, since we only perform an operation on one task only // not ideal, but throwing exceptions here works, because higher up the stack there is a try-catch block delegating to // the actionlistener's onFailure if (tasks.isEmpty()) { if (taskOperationFailures.isEmpty() == false) { - throw wrapThrowable(taskOperationFailures.get(0).getCause()); + throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); } else if (failedNodeExceptions.isEmpty() == false) { - throw wrapThrowable(failedNodeExceptions.get(0).getCause()); + throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); } else { throw new IllegalStateException("No errors or response"); } @@ -112,14 +118,6 @@ public abstract class TransportJobTaskAction listener) { + public void stopDatafeed(StopDatafeedAction.Request request, ActionListener listener) { client.execute(StopDatafeedAction.INSTANCE, request, listener); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 70df4cdb01b..4343f5af91d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.RestStatus; @@ -25,9 +24,6 @@ import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.Job; @@ -266,33 +262,21 @@ public class DatafeedJobRunner extends AbstractComponent { public void stop(String source, Exception e) { logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); - // We need to fork, because: - // 1) We are being called from cluster state update thread and we should return as soon as possible - // 2) We also index into the notifaction index and that is forbidden from the cluster state update thread: - // (Caused by: java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied - // cluster state is not yet available]) - threadPool.executor(ThreadPool.Names.GENERIC).submit(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn("failed to stop [{}] datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); + if (datafeedJob.stop()) { + try { + logger.info("[{}] stopping datafeed [{}] for job [{}]...", source, datafeed.getId(), datafeed.getJobId()); + FutureUtils.cancel(future); + auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); + if (autoCloseJob) { + closeJob(); + } + } finally { handler.accept(e); } - - @Override - protected void doRun() throws Exception { - if (datafeedJob.stop()) { - FutureUtils.cancel(future); - handler.accept(e); - auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); - if (autoCloseJob) { - closeJob(); - } - } else { - logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); - } - } - }); + } else { + logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); + } } private void closeJob() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index 5c92a971539..676ebc16426 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -7,12 +7,19 @@ package org.elasticsearch.xpack.ml.rest.datafeeds; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.AcknowledgedRestListener; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Response; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import java.io.IOException; @@ -27,8 +34,28 @@ public class RestStopDatafeedAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - StopDatafeedAction.Request jobDatafeedRequest = new StopDatafeedAction.Request( - restRequest.param(DatafeedConfig.ID.getPreferredName())); - return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new AcknowledgedRestListener<>(channel)); + String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + StopDatafeedAction.Request jobDatafeedRequest; + if (restRequest.hasContentOrSourceParam()) { + XContentParser parser = restRequest.contentOrSourceParamParser(); + jobDatafeedRequest = StopDatafeedAction.Request.parseRequest(datafeedId, parser); + } else { + jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) { + TimeValue openTimeout = restRequest.paramAsTime( + StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); + jobDatafeedRequest.setTimeout(openTimeout); + } + } + return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener(channel) { + + @Override + public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { + builder.startObject(); + builder.field("stopped", response.isStopped()); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + }); } } diff --git a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java index 8b05578ef9e..74e0cb9fffb 100644 --- a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.client.MachineLearningClient; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentActionResponse; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import org.junit.Before; import java.util.Collections; @@ -250,7 +249,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>( + PlainListenableActionFuture listener = new PlainListenableActionFuture<>( client.threadPool()); new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener); listener.actionGet(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index c05ca10fd55..0a5a89069d3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentActionResponse; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import org.junit.Before; import java.util.Collections; @@ -120,8 +119,8 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase { StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId()); try { - RemovePersistentTaskAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); - assertTrue(stopJobResponse.isAcknowledged()); + StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); + assertTrue(stopJobResponse.isStopped()); } catch (Exception e) { NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get(); int i = 0; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java index 99e7c232d27..414f2373e2f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionRequestTests.java @@ -18,6 +18,9 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas if (randomBoolean()) { request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + if (randomBoolean()) { + request.setIgnoreDowntime(randomBoolean()); + } return request; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java index f510b55708c..9483857a4d3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; @@ -17,6 +18,9 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT if (randomBoolean()) { request.setEndTime(randomNonNegativeLong()); } + if (randomBoolean()) { + request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } return request; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index c47fbeb7496..fa899ffaa4a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -5,23 +5,35 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.persistent.PersistentActionRequest; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; + +import java.util.Collections; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.hamcrest.Matchers.equalTo; -public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase { +public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase { @Override protected Request createTestInstance() { - Request r = new Request(randomAsciiOfLengthBetween(1, 20)); - return r; + Request request = new Request(randomAsciiOfLengthBetween(1, 20)); + if (randomBoolean()) { + request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } + return request; } @Override @@ -29,18 +41,50 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase task = new PersistentTaskInProgress(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", "")); + task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + Job job = createDatafeedJob().build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); Exception e = expectThrows(ResourceNotFoundException.class, - () -> StopDatafeedAction.validate("foo", mlMetadata1)); + () -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks)); assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists")); DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) .putDatafeed(datafeedConfig) .build(); - StopDatafeedAction.validate("foo", mlMetadata2); + StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata2, tasks); + } + + public void testValidate_alreadyStopped() { + PersistentTasksInProgress tasks; + if (randomBoolean()) { + PersistentTaskInProgress task = new PersistentTaskInProgress(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", "")); + task = new PersistentTaskInProgress<>(task, DatafeedState.STOPPED); + tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + } else { + tasks = randomBoolean() ? null : new PersistentTasksInProgress(0L, Collections.emptyMap()); + } + + Job job = createDatafeedJob().build(); + DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job, false) + .putDatafeed(datafeedConfig) + .build(); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks)); + assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed state [started], but got [stopped]")); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index eb43e5ab3a3..0253779cfec 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -306,7 +306,7 @@ public class DatafeedJobIT extends ESRestTestCase { response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); + assertThat(responseEntityToString(response), equalTo("{\"stopped\":true}")); client().performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index a8ac66211ca..c917c5b56b5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import org.junit.After; import java.util.Collections; @@ -193,9 +192,9 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) { String datafeedId = datafeed.getId(); try { - RemovePersistentTaskAction.Response stopResponse = + StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); - assertTrue(stopResponse.isAcknowledged()); + assertTrue(stopResponse.isStopped()); } catch (ExecutionException e) { // CONFLICT is ok, as it means the datafeed has already stopped, which isn't an issue at all. if (RestStatus.CONFLICT != ExceptionsHelper.status(e.getCause())) { diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json index fe8fa172f55..44071c4da1b 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json @@ -13,6 +13,11 @@ "type": "string", "required": true, "description": "The ID of the datafeed to stop" + }, + "timeout": { + "type": "time", + "required": false, + "description": "Controls the time to wait until a datafeed has stopped. Default to 20 seconds" } }, "body": null diff --git a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index ad05e719efc..b57d0a43de1 100644 --- a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -145,7 +145,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); assertEquals(200, response.getStatusLine().getStatusCode()); - assertEquals(Collections.singletonMap("acknowledged", true), responseEntityToMap(response)); + assertEquals(Collections.singletonMap("stopped", true), responseEntityToMap(response)); response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close"); assertEquals(200, response.getStatusLine().getStatusCode());