diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 600121cd70f..b89a9676db6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -45,6 +46,7 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; import java.util.Date; @@ -75,12 +77,14 @@ public class CloseJobAction extends Action implements ToXContent { public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final ParseField FORCE = new ParseField("force"); public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { PARSER.declareString(Request::setJobId, Job.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -93,6 +97,7 @@ public class CloseJobAction extends Action listener) throws Exception { + if (request.isForce()) { + forceCloseJob(client, request.getJobId(), state, listener); + } else { + closeJob(request, listener); + } + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private void closeJob(Request request, ActionListener listener) { clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -274,10 +305,24 @@ public class CloseJobAction extends Action listener) { + PersistentTask task = MlMetadata.getJobTask(jobId, + currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + if (task != null) { + RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(task.getId()); + client.execute(RemovePersistentTaskAction.INSTANCE, request, + ActionListener.wrap( + response -> listener.onResponse(new Response(response.isAcknowledged())), + listener::onFailure)); + } else { + String msg = "Requested job [" + jobId + "] be force-closed, but job's task" + + "could not be found."; + logger.warn(msg); + listener.onFailure(new RuntimeException(msg)); + } } + } static PersistentTask validateAndFindTask(String jobId, ClusterState state) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index b4d77a73d33..01ae43cb892 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -46,6 +47,7 @@ import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import java.io.IOException; import java.util.List; @@ -57,6 +59,7 @@ public class StopDatafeedAction public static final StopDatafeedAction INSTANCE = new StopDatafeedAction(); public static final String NAME = "cluster:admin/ml/datafeeds/stop"; public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final ParseField FORCE = new ParseField("force"); private StopDatafeedAction() { super(NAME); @@ -80,6 +83,7 @@ public class StopDatafeedAction PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request fromXContent(XContentParser parser) { @@ -95,6 +99,7 @@ public class StopDatafeedAction } private String datafeedId; + private boolean force = false; public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); @@ -109,6 +114,14 @@ public class StopDatafeedAction return datafeedId; } + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } + @Override public boolean match(Task task) { String expectedDescription = "datafeed-" + datafeedId; @@ -124,12 +137,14 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + force = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + out.writeBoolean(force); } @Override @@ -158,7 +173,8 @@ public class StopDatafeedAction } Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && - Objects.equals(getTimeout(), other.getTimeout()); + Objects.equals(getTimeout(), other.getTimeout()) && + Objects.equals(force, other.force); } } @@ -204,25 +220,41 @@ public class StopDatafeedAction public static class TransportAction extends TransportTasksAction { + private final Client client; + @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService) { + ClusterService clusterService, Client client) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME); + this.client = client; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { ClusterState state = clusterService.state(); MetaData metaData = state.metaData(); - MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = metaData.custom(PersistentTasksCustomMetaData.TYPE); - String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); - request.setNodes(nodeId); - ActionListener finalListener = - ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure); - super.doExecute(task, request, finalListener); + + if (request.force) { + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); + if (datafeedTask != null) { + forceStopTask(client, datafeedTask.getId(), listener); + } else { + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + + "datafeed's task could not be found."; + logger.warn(msg); + listener.onFailure(new RuntimeException(msg)); + } + } else { + MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); + String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); + request.setNodes(nodeId); + ActionListener finalListener = + ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure); + super.doExecute(task, request, finalListener); + } } // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed @@ -239,6 +271,15 @@ public class StopDatafeedAction }); } + private void forceStopTask(Client client, long taskId, ActionListener listener) { + RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId); + + client.execute(RemovePersistentTaskAction.INSTANCE, request, + ActionListener.wrap( + response -> listener.onResponse(new Response(response.isAcknowledged())), + listener::onFailure)); + } + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index 676ebc16426..f3fa71a459b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -46,6 +46,10 @@ public class RestStopDatafeedAction extends BaseRestHandler { StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); jobDatafeedRequest.setTimeout(openTimeout); } + if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) { + jobDatafeedRequest.setForce( + restRequest.paramAsBoolean(StopDatafeedAction.FORCE.getPreferredName(), false)); + } } return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener(channel) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java index e647ccdced7..9d09be48a6e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestCloseJobAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.CloseJobAction; +import org.elasticsearch.xpack.ml.action.CloseJobAction.Request; import org.elasticsearch.xpack.ml.job.config.Job; import java.io.IOException; @@ -28,9 +29,13 @@ public class RestCloseJobAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - CloseJobAction.Request request = new CloseJobAction.Request(restRequest.param(Job.ID.getPreferredName())); - if (restRequest.hasParam("timeout")) { - request.setTimeout(TimeValue.parseTimeValue(restRequest.param("timeout"), "timeout")); + Request request = new Request(restRequest.param(Job.ID.getPreferredName())); + if (restRequest.hasParam(Request.TIMEOUT.getPreferredName())) { + request.setTimeout(TimeValue.parseTimeValue( + restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName())); + } + if (restRequest.hasParam(Request.FORCE.getPreferredName())) { + request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), false)); } return channel -> client.execute(CloseJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index c263bdf66fc..2813cc04448 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -88,7 +88,14 @@ public class MlRestTestStateCleaner { || e.getMessage().contains("expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); } else { - logger.warn("failed to close job [" + jobId + "]", e); + logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e); + + try { + adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true"); + throw new RuntimeException("Had to resort to force-closing job, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2); + } } } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index ecf5294cda2..fb1260dcd00 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -253,9 +253,17 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { assertTrue(stopResponse.isStopped()); } catch (ExecutionException e) { if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); + logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e); } else { - throw new RuntimeException(e); + try { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); + request.setForce(true); + StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); + assertTrue(stopResponse.isStopped()); + throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-stopping datafeed [" + datafeedId + "] failed.", e2); + } } } assertBusy(() -> { @@ -289,7 +297,18 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { || e.getMessage().contains("expected job state [opened], but got [closing]")) { logger.debug("job [" + jobId + "] has already been closed", e); } else { - throw new RuntimeException(e); + try { + CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId); + closeRequest.setForce(true); + closeRequest.setTimeout(TimeValue.timeValueSeconds(30L)); + CloseJobAction.Response response = + client.execute(CloseJobAction.INSTANCE, closeRequest).get(); + assertTrue(response.isClosed()); + throw new RuntimeException("Had to resort to force-closing job, something went wrong?"); + } catch (Exception e2) { + throw new RuntimeException("Force-closing datafeed [" + jobId + "] failed.", e2); + } + } } assertBusy(() -> { diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json index fc21e85e69c..8a20b294948 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.close_job.json @@ -9,6 +9,13 @@ "type": "string", "required": true, "description": "The name of the job to close" + } + }, + "params": { + "force": { + "type": "boolean", + "required": false, + "description": "True if the job should be forcefully closed" }, "timeout": { "type": "time", diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json index 44071c4da1b..a6f53f6ace1 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.stop_datafeed.json @@ -13,6 +13,13 @@ "type": "string", "required": true, "description": "The ID of the datafeed to stop" + } + }, + "params": { + "force": { + "type": "boolean", + "required": false, + "description": "True if the datafeed should be forcefully stopped." }, "timeout": { "type": "time", diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 491a64e2921..3a24832ec97 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -295,3 +295,123 @@ catch: /Cannot delete job \[datafeed-job\] while datafeed \[test-datafeed-1\] refers to it/ xpack.ml.delete_job: job_id: datafeed-job +--- +"Test close job": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":"1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + + - do: + xpack.ml.close_job: + job_id: farequote + - match: { closed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: + metadata.persistent_tasks.tasks: [] + +--- +"Test force close job": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":"1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + + - do: + xpack.ml.close_job: + job_id: farequote + force: true + - match: { closed: true } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks + - match: + metadata.persistent_tasks.tasks: [] diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml index 7bc8a9db7cf..4ea5750b0ff 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml @@ -57,6 +57,41 @@ setup: datafeed_id: "datafeed-1" - match: { datafeeds.0.state: stopped } +--- +"Test force stop datafeed": + - do: + xpack.ml.open_job: + job_id: "datafeed-job" + - do: + xpack.ml.start_datafeed: + "datafeed_id": "datafeed-1" + "start": 0 + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "datafeed-1" + - match: { datafeeds.0.state: started } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks.**.DatafeedState + - match: {metadata.persistent_tasks.tasks.0.status.DatafeedState.state: started} + + - do: + xpack.ml.stop_datafeed: + "datafeed_id": "datafeed-1" + force: true + - do: + xpack.ml.get_datafeed_stats: + datafeed_id: "datafeed-1" + - match: { datafeeds.0.state: stopped } + + - do: + cluster.state: + metric: [ metadata ] + filter_path: metadata.persistent_tasks.**.DatafeedState + - is_false: metadata.persistent_tasks.tasks.0.status.DatafeedState + --- "Test start datafeed given start is now": - do: