From 4285335dfb0f23df10b60314a48d4bee15e5ae1e Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 24 Mar 2017 13:10:20 -0400 Subject: [PATCH] [ML] Add a 'force' parameter to CloseJob and StopDataFeed endpoints (elastic/x-pack-elasticsearch#710) If forced, the internal RemovePersistentTasks API is invoked instead of going through ML. This will remove the task, which should trigger the task framework to do necessary cleanup. At that point, the Delete* APIs interpret a missing task as CLOSED/STOPPED, so they can be removed regardless of the original state. Original commit: elastic/x-pack-elasticsearch@bff23c7840928b99038fb625f19a364296b2fa38 --- .../xpack/ml/action/CloseJobAction.java | 55 +++++++- .../xpack/ml/action/StopDatafeedAction.java | 57 +++++++-- .../datafeeds/RestStopDatafeedAction.java | 4 + .../xpack/ml/rest/job/RestCloseJobAction.java | 11 +- .../integration/MlRestTestStateCleaner.java | 9 +- .../xpack/ml/support/BaseMlIntegTestCase.java | 25 +++- .../rest-api-spec/api/xpack.ml.close_job.json | 7 + .../api/xpack.ml.stop_datafeed.json | 7 + .../rest-api-spec/test/ml/jobs_crud.yaml | 120 ++++++++++++++++++ .../test/ml/start_stop_datafeed.yaml | 35 +++++ 10 files changed, 310 insertions(+), 20 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 600121cd70f..b89a9676db6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -45,6 +46,7 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; import java.util.Date; @@ -75,12 +77,14 @@ public class CloseJobAction extends Action implements ToXContent { public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final ParseField FORCE = new ParseField("force"); public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { PARSER.declareString(Request::setJobId, Job.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -93,6 +97,7 @@ public class CloseJobAction extends Action listener) throws Exception { + if (request.isForce()) { + forceCloseJob(client, request.getJobId(), state, listener); + } else { + closeJob(request, listener); + } + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private void closeJob(Request request, ActionListener listener) { clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -274,10 +305,24 @@ public class CloseJobAction extends Action listener) { + PersistentTask task = MlMetadata.getJobTask(jobId, + currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + if (task != null) { + RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(task.getId()); + client.execute(RemovePersistentTaskAction.INSTANCE, request, + ActionListener.wrap( + response -> listener.onResponse(new Response(response.isAcknowledged())), + listener::onFailure)); + } else { + String msg = "Requested job [" + jobId + "] be force-closed, but job's task" + + "could not be found."; + logger.warn(msg); + listener.onFailure(new RuntimeException(msg)); + } } + } static PersistentTask validateAndFindTask(String jobId, ClusterState state) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index b4d77a73d33..01ae43cb892 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -46,6 +47,7 @@ import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; import java.util.List; @@ -57,6 +59,7 @@ public class StopDatafeedAction public static final StopDatafeedAction INSTANCE = new StopDatafeedAction(); public static final String NAME = "cluster:admin/ml/datafeeds/stop"; public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final ParseField FORCE = new ParseField("force"); private StopDatafeedAction() { super(NAME); @@ -80,6 +83,7 @@ public class StopDatafeedAction PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request fromXContent(XContentParser parser) { @@ -95,6 +99,7 @@ public class StopDatafeedAction } private String datafeedId; + private boolean force = false; public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); @@ -109,6 +114,14 @@ public class StopDatafeedAction return datafeedId; } + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } + @Override public boolean match(Task task) { String expectedDescription = "datafeed-" + datafeedId; @@ -124,12 +137,14 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + force = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + out.writeBoolean(force); } @Override @@ -158,7 +173,8 @@ public class StopDatafeedAction } Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && - Objects.equals(getTimeout(), other.getTimeout()); + Objects.equals(getTimeout(), other.getTimeout()) && + Objects.equals(force, other.force); } } @@ -204,25 +220,41 @@ public class StopDatafeedAction public static class TransportAction extends TransportTasksAction { + private final Client client; + @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService) { + ClusterService clusterService, Client client) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); + this.client = client; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { ClusterState state = clusterService.state(); MetaData metaData = state.metaData(); - MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = metaData.custom(PersistentTasksCustomMetaData.TYPE); - String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); - request.setNodes(nodeId); - ActionListener finalListener = - ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure); - super.doExecute(task, request, finalListener); + + if (request.force) { + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); + if (datafeedTask != null) { + forceStopTask(client, datafeedTask.getId(), listener); + } else { + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + + "datafeed's task could not be found."; + logger.warn(msg); + listener.onFailure(new RuntimeException(msg)); + } + } else { + MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); + String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); + request.setNodes(nodeId); + ActionListener finalListener = + ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure); + super.doExecute(task, request, finalListener); + } } // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed @@ -239,6 +271,15 @@ public class StopDatafeedAction }); } + private void forceStopTask(Client client, long taskId, ActionListener listener) { + RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId); + + client.execute(RemovePersistentTaskAction.INSTANCE, request, + ActionListener.wrap( + response -> listener.onResponse(new Response(response.isAcknowledged())), + listener::onFailure)); + } + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index 676ebc16426..f3fa71a459b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -46,6 +46,10 @@ public class RestStopDatafeedAction extends BaseRestHandler { StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); jobDatafeedRequest.setTimeout(openTimeout); } + if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) { + jobDatafeedRequest.setForce( + restRequest.paramAsBoolean(StopDatafeedAction.FORCE.getPreferredName(), false)); + } } return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener(channel) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java index e647ccdced7..9d09be48a6e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.CloseJobAction; +import org.elasticsearch.xpack.ml.action.CloseJobAction.Request; import org.elasticsearch.xpack.ml.job.config.Job; import java.io.IOException; @@ -28,9 +29,13 @@ public class RestCloseJobAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - CloseJobAction.Request request = new CloseJobAction.Request(restRequest.param(Job.ID.getPreferredName())); - if (restRequest.hasParam("timeout")) { - request.setTimeout(TimeValue.parseTimeValue(restRequest.param("timeout"), "timeout")); + Request request = new Request(restRequest.param(Job.ID.getPreferredName())); + if (restRequest.hasParam(Request.TIMEOUT.getPreferredName())) { + request.setTimeout(TimeValue.parseTimeValue( + restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName())); + } + if (restRequest.hasParam(Request.FORCE.getPreferredName())) { + request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), false)); } return channel -> client.execute(CloseJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index c263bdf66fc..2813cc04448 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -88,7 +88,14 @@ public class MlRestTestStateCleaner { || e.getMessage().contains("expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); } else { - logger.warn("failed to close job [" + jobId + "]", e); + logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e); + + try { + adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); + throw new RuntimeException("Had to resort to force-closing job, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2); + } } } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index ecf5294cda2..fb1260dcd00 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -253,9 +253,17 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { assertTrue(stopResponse.isStopped()); } catch (ExecutionException e) { if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); + logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e); } else { - throw new RuntimeException(e); + try { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); + request.setForce(true); + StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); + assertTrue(stopResponse.isStopped()); + throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-stopping datafeed [" + datafeedId + "] failed.", e2); + } } } assertBusy(() -> { @@ -289,7 +297,18 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { || e.getMessage().contains("expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); } else { - throw new RuntimeException(e); + try { + CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId); + closeRequest.setForce(true); + closeRequest.setTimeout(TimeValue.timeValueSeconds(30L)); + CloseJobAction.Response response = + client.execute(CloseJobAction.INSTANCE, closeRequest).get(); + assertTrue(response.isClosed()); + throw new RuntimeException("Had to resort to force-closing job, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-closing datafeed [" + jobId + "] failed.", e2); + } + } } assertBusy(() -> { diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json index fc21e85e69c..8a20b294948 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json @@ -9,6 +9,13 @@ "type": "string", "required": true, "description": "The name of the job to close" + } + }, + "params": { + "force": { + "type": "boolean", + "required": false, + "description": "True if the job should be forcefully closed" }, "timeout": { "type": "time", diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json index 44071c4da1b..a6f53f6ace1 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json @@ -13,6 +13,13 @@ "type": "string", "required": true, "description": "The ID of the datafeed to stop" + } + }, + "params": { + "force": { + "type": "boolean", + "required": false, + "description": "True if the datafeed should be forcefully stopped." }, "timeout": { "type": "time", diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 491a64e2921..3a24832ec97 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -295,3 +295,123 @@ catch: /Cannot delete job \[datafeed-job\] while datafeed \[test-datafeed-1\] refers to it/ xpack.ml.delete_job: job_id: datafeed-job +--- +"Test close job": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":"1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + + - do: + xpack.ml.close_job: + job_id: farequote + - match: { closed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: + metadata.persistent_tasks.tasks: [] + +--- +"Test force close job": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":"1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + + - do: + xpack.ml.close_job: + job_id: farequote + force: true + - match: { closed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: + metadata.persistent_tasks.tasks: [] diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml index 7bc8a9db7cf..4ea5750b0ff 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml @@ -57,6 +57,41 @@ setup: datafeed_id: "datafeed-1" - match: { datafeeds.0.state: stopped } +--- +"Test force stop datafeed": + - do: + xpack.ml.open_job: + job_id: "datafeed-job" + - do: + xpack.ml.start_datafeed: + "datafeed_id": "datafeed-1" + "start": 0 + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "datafeed-1" + - match: { datafeeds.0.state: started } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks.**.DatafeedState + - match: {metadata.persistent_tasks.tasks.0.status.DatafeedState.state: started} + + - do: + xpack.ml.stop_datafeed: + "datafeed_id": "datafeed-1" + force: true + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "datafeed-1" + - match: { datafeeds.0.state: stopped } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks.**.DatafeedState + - is_false: metadata.persistent_tasks.tasks.0.status.DatafeedState + --- "Test start datafeed given start is now": - do: