[ML] Change stop datafeed api delegate to node hosting datafeed task and execute cancel locally,

instead of only removing the persistent task from cluster state.

Original commit: elastic/x-pack-elasticsearch@3974b20827
This commit is contained in:
Martijn van Groningen 2017-02-27 17:47:38 +01:00
parent 59b50bb18c
commit 6783f823a8
14 changed files with 268 additions and 107 deletions

View File

@ -11,39 +11,51 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Objects; import java.util.Objects;
public class StopDatafeedAction public class StopDatafeedAction
extends Action<StopDatafeedAction.Request, RemovePersistentTaskAction.Response, StopDatafeedAction.RequestBuilder> { extends Action<StopDatafeedAction.Request, StopDatafeedAction.Response, StopDatafeedAction.RequestBuilder> {
public static final StopDatafeedAction INSTANCE = new StopDatafeedAction(); public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/stop"; public static final String NAME = "cluster:admin/ml/datafeeds/stop";
public static final ParseField TIMEOUT = new ParseField("timeout");
private StopDatafeedAction() { private StopDatafeedAction() {
super(NAME); super(NAME);
@ -55,16 +67,37 @@ public class StopDatafeedAction
} }
@Override @Override
public RemovePersistentTaskAction.Response newResponse() { public Response newResponse() {
return new RemovePersistentTaskAction.Response(); return new Response();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends BaseTasksRequest<Request> implements ToXContent {
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
public static Request parseRequest(String datafeedId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (datafeedId != null) {
request.datafeedId = datafeedId;
}
return request;
}
private String datafeedId; private String datafeedId;
public Request(String jobId) { public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());
setActions(StartDatafeedAction.NAME);
} }
Request() { Request() {
@ -74,6 +107,12 @@ public class StopDatafeedAction
return datafeedId; return datafeedId;
} }
@Override
public boolean match(Task task) {
String expectedDescription = "datafeed-" + datafeedId;
return task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription());
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
@ -93,7 +132,18 @@ public class StopDatafeedAction
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(datafeedId); return Objects.hash(datafeedId, getTimeout());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
if (getTimeout() != null) {
builder.field(TIMEOUT.getPreferredName(), getTimeout().getStringRep());
}
builder.endObject();
return builder;
} }
@Override @Override
@ -105,71 +155,121 @@ public class StopDatafeedAction
return false; return false;
} }
Request other = (Request) obj; Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId); return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(getTimeout(), other.getTimeout());
} }
} }
static class RequestBuilder extends ActionRequestBuilder<Request, RemovePersistentTaskAction.Response, RequestBuilder> { public static class Response extends BaseTasksResponse implements Writeable {
private boolean stopped;
public Response(boolean stopped) {
super(null, null);
this.stopped = stopped;
}
public Response(StreamInput in) throws IOException {
readFrom(in);
}
public Response() {
}
public boolean isStopped() {
return stopped;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stopped = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(stopped);
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) { RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) {
super(client, action, new Request()); super(client, action, new Request());
} }
} }
public static class TransportAction extends TransportMasterNodeAction<Request, RemovePersistentTaskAction.Response> { public static class TransportAction extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
private final RemovePersistentTaskAction.TransportAction removePersistentTaskAction;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, RemovePersistentTaskAction.TransportAction removePersistentTaskAction) { ClusterService clusterService) {
super(settings, StopDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new); indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME);
this.removePersistentTaskAction = removePersistentTaskAction;
} }
@Override @Override
protected String executor() { protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
return ThreadPool.Names.SAME; ClusterState state = clusterService.state();
MetaData metaData = state.metaData();
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = metaData.custom(PersistentTasksInProgress.TYPE);
String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks);
request.setNodes(nodeId);
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure);
super.doExecute(task, request, finalListener);
} }
@Override // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed
protected RemovePersistentTaskAction.Response newResponse() { // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
return new RemovePersistentTaskAction.Response(); // so wait for that to happen here.
} void waitForDatafeedStopped(Request request, Response response, ActionListener<Response> listener) {
DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService);
@Override observer.waitForState(request.getDatafeedId(), request.getTimeout(), DatafeedState.STOPPED, e -> {
protected void masterOperation(Request request, ClusterState state, if (e != null) {
ActionListener<RemovePersistentTaskAction.Response> listener) throws Exception { listener.onFailure(e);
String datafeedId = request.getDatafeedId();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
validate(datafeedId, mlMetadata);
PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
if (task != null) {
RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request();
removeTaskRequest.setTaskId(task.getId());
removePersistentTaskAction.execute(removeTaskRequest, listener);
} else { } else {
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", listener.onResponse(response);
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED));
} }
});
} }
@Override @Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) { protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
// Remove persistent action actually updates cs, here we just read it. List<FailedNodeException> failedNodeExceptions) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); return TransportJobTaskAction.selectFirst(tasks, taskOperationFailures, failedNodeExceptions);
} }
@Override
protected Response readTaskResponse(StreamInput in) throws IOException {
return new Response(in);
} }
static void validate(String datafeedId, MlMetadata mlMetadata) { @Override
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
task.stop();
listener.onResponse(new Response(true));
}
@Override
protected boolean accumulateExceptions() {
return true;
}
}
static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) { if (datafeed == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
} }
PersistentTasksInProgress.PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (task == null || task.getStatus() != DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED);
}
return task.getExecutorNode();
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
@ -93,14 +92,21 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
@Override @Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures, protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) { List<FailedNodeException> failedNodeExceptions) {
return selectFirst(tasks, taskOperationFailures, failedNodeExceptions);
}
static <Response extends BaseTasksResponse> Response selectFirst(List<Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
// no need to accumulate sub responses, since we only perform an operation on one task only // no need to accumulate sub responses, since we only perform an operation on one task only
// not ideal, but throwing exceptions here works, because higher up the stack there is a try-catch block delegating to // not ideal, but throwing exceptions here works, because higher up the stack there is a try-catch block delegating to
// the actionlistener's onFailure // the actionlistener's onFailure
if (tasks.isEmpty()) { if (tasks.isEmpty()) {
if (taskOperationFailures.isEmpty() == false) { if (taskOperationFailures.isEmpty() == false) {
throw wrapThrowable(taskOperationFailures.get(0).getCause()); throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) { } else if (failedNodeExceptions.isEmpty() == false) {
throw wrapThrowable(failedNodeExceptions.get(0).getCause()); throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
} else { } else {
throw new IllegalStateException("No errors or response"); throw new IllegalStateException("No errors or response");
} }
@ -112,14 +118,6 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
} }
} }
private ElasticsearchException wrapThrowable(Throwable th) {
if (th instanceof ElasticsearchException) {
return (ElasticsearchException) th;
} else {
return new ElasticsearchException(th);
}
}
@Override @Override
protected boolean accumulateExceptions() { protected boolean accumulateExceptions() {
return true; return true;

View File

@ -35,7 +35,6 @@ import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
public class MachineLearningClient { public class MachineLearningClient {
@ -139,7 +138,7 @@ public class MachineLearningClient {
client.execute(StartDatafeedAction.INSTANCE, request, listener); client.execute(StartDatafeedAction.INSTANCE, request, listener);
} }
public void stopDatafeed(StopDatafeedAction.Request request, ActionListener<RemovePersistentTaskAction.Response> listener) { public void stopDatafeed(StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener) {
client.execute(StopDatafeedAction.INSTANCE, request, listener); client.execute(StopDatafeedAction.INSTANCE, request, listener);
} }

View File

@ -14,7 +14,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -25,9 +24,6 @@ import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
@ -266,34 +262,22 @@ public class DatafeedJobRunner extends AbstractComponent {
public void stop(String source, Exception e) { public void stop(String source, Exception e) {
logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
// We need to fork, because:
// 1) We are being called from cluster state update thread and we should return as soon as possible
// 2) We also index into the notifaction index and that is forbidden from the cluster state update thread:
// (Caused by: java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied
// cluster state is not yet available])
threadPool.executor(ThreadPool.Names.GENERIC).submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("failed to stop [{}] datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
handler.accept(e);
}
@Override
protected void doRun() throws Exception {
if (datafeedJob.stop()) { if (datafeedJob.stop()) {
try {
logger.info("[{}] stopping datafeed [{}] for job [{}]...", source, datafeed.getId(), datafeed.getJobId());
FutureUtils.cancel(future); FutureUtils.cancel(future);
handler.accept(e);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
if (autoCloseJob) { if (autoCloseJob) {
closeJob(); closeJob();
} }
} finally {
handler.accept(e);
}
} else { } else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
} }
} }
});
}
private void closeJob() { private void closeJob() {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());

View File

@ -7,12 +7,19 @@ package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Response;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import java.io.IOException; import java.io.IOException;
@ -27,8 +34,28 @@ public class RestStopDatafeedAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
StopDatafeedAction.Request jobDatafeedRequest = new StopDatafeedAction.Request( String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
restRequest.param(DatafeedConfig.ID.getPreferredName())); StopDatafeedAction.Request jobDatafeedRequest;
return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new AcknowledgedRestListener<>(channel)); if (restRequest.hasContentOrSourceParam()) {
XContentParser parser = restRequest.contentOrSourceParamParser();
jobDatafeedRequest = StopDatafeedAction.Request.parseRequest(datafeedId, parser);
} else {
jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(
StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout);
}
}
return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field("stopped", response.isStopped());
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
});
} }
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.client.MachineLearningClient; import org.elasticsearch.xpack.ml.client.MachineLearningClient;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.Before; import org.junit.Before;
import java.util.Collections; import java.util.Collections;
@ -250,7 +249,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<RemovePersistentTaskAction.Response> listener = new PlainListenableActionFuture<>( PlainListenableActionFuture<StopDatafeedAction.Response> listener = new PlainListenableActionFuture<>(
client.threadPool()); client.threadPool());
new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener); new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener);
listener.actionGet(); listener.actionGet();

View File

@ -14,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.Before; import org.junit.Before;
import java.util.Collections; import java.util.Collections;
@ -120,8 +119,8 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase {
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId()); StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId());
try { try {
RemovePersistentTaskAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get();
assertTrue(stopJobResponse.isAcknowledged()); assertTrue(stopJobResponse.isStopped());
} catch (Exception e) { } catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get(); NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
int i = 0; int i = 0;

View File

@ -18,6 +18,9 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas
if (randomBoolean()) { if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
} }
if (randomBoolean()) {
request.setIgnoreDowntime(randomBoolean());
}
return request; return request;
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
@ -17,6 +18,9 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
if (randomBoolean()) { if (randomBoolean()) {
request.setEndTime(randomNonNegativeLong()); request.setEndTime(randomNonNegativeLong());
} }
if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
return request; return request;
} }

View File

@ -5,23 +5,35 @@
*/ */
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.Collections;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase<StopDatafeedAction.Request> { public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {
Request r = new Request(randomAsciiOfLengthBetween(1, 20)); Request request = new Request(randomAsciiOfLengthBetween(1, 20));
return r; if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
return request;
} }
@Override @Override
@ -29,18 +41,50 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase<S
return new Request(); return new Request();
} }
@Override
protected Request parseInstance(XContentParser parser) {
return Request.parseRequest(null, parser);
}
public void testValidate() { public void testValidate() {
PersistentTaskInProgress<?> task = new PersistentTaskInProgress<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", ""));
task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
Job job = createDatafeedJob().build(); Job job = createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
Exception e = expectThrows(ResourceNotFoundException.class, Exception e = expectThrows(ResourceNotFoundException.class,
() -> StopDatafeedAction.validate("foo", mlMetadata1)); () -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists")); assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists"));
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false)
.putDatafeed(datafeedConfig) .putDatafeed(datafeedConfig)
.build(); .build();
StopDatafeedAction.validate("foo", mlMetadata2); StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata2, tasks);
}
public void testValidate_alreadyStopped() {
PersistentTasksInProgress tasks;
if (randomBoolean()) {
PersistentTaskInProgress<?> task = new PersistentTaskInProgress<PersistentActionRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", ""));
task = new PersistentTaskInProgress<>(task, DatafeedState.STOPPED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
} else {
tasks = randomBoolean() ? null : new PersistentTasksInProgress(0L, Collections.emptyMap());
}
Job job = createDatafeedJob().build();
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed state [started], but got [stopped]"));
} }
} }

View File

@ -306,7 +306,7 @@ public class DatafeedJobIT extends ESRestTestCase {
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); assertThat(responseEntityToString(response), equalTo("{\"stopped\":true}"));
client().performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"); client().performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close");

View File

@ -36,7 +36,6 @@ import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.After; import org.junit.After;
import java.util.Collections; import java.util.Collections;
@ -193,9 +192,9 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) { for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
String datafeedId = datafeed.getId(); String datafeedId = datafeed.getId();
try { try {
RemovePersistentTaskAction.Response stopResponse = StopDatafeedAction.Response stopResponse =
client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get();
assertTrue(stopResponse.isAcknowledged()); assertTrue(stopResponse.isStopped());
} catch (ExecutionException e) { } catch (ExecutionException e) {
// CONFLICT is ok, as it means the datafeed has already stopped, which isn't an issue at all. // CONFLICT is ok, as it means the datafeed has already stopped, which isn't an issue at all.
if (RestStatus.CONFLICT != ExceptionsHelper.status(e.getCause())) { if (RestStatus.CONFLICT != ExceptionsHelper.status(e.getCause())) {

View File

@ -13,6 +13,11 @@
"type": "string", "type": "string",
"required": true, "required": true,
"description": "The ID of the datafeed to stop" "description": "The ID of the datafeed to stop"
},
"timeout": {
"type": "time",
"required": false,
"description": "Controls the time to wait until a datafeed has stopped. Default to 20 seconds"
} }
}, },
"body": null "body": null

View File

@ -145,7 +145,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("acknowledged", true), responseEntityToMap(response)); assertEquals(Collections.singletonMap("stopped", true), responseEntityToMap(response));
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close"); response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());