[ML] Add a 'force' parameter to CloseJob and StopDataFeed endpoints (elastic/x-pack-elasticsearch#710)

If forced, the internal RemovePersistentTasks API is invoked instead of going through
ML.  This will remove the task, which should trigger the task framework to do
necessary cleanup.

At that point, the Delete* APIs interpret a missing task as CLOSED/STOPPED,
so they can be removed regardless of the original state.

Original commit: elastic/x-pack-elasticsearch@bff23c7840
This commit is contained in:
Zachary Tong 2017-03-24 13:10:20 -04:00 committed by GitHub
parent 1d660f373f
commit 4285335dfb
10 changed files with 310 additions and 20 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@ -45,6 +46,7 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import java.io.IOException;
import java.util.Date;
@ -75,12 +77,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class Request extends MasterNodeRequest<Request> implements ToXContent {
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -93,6 +97,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private String jobId;
private TimeValue timeout = TimeValue.timeValueMinutes(20);
private boolean force = false;
Request() {}
@ -116,6 +121,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
this.timeout = timeout;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -126,6 +139,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
super.readFrom(in);
jobId = in.readString();
timeout = new TimeValue(in);
force = in.readBoolean();
}
@Override
@ -133,6 +147,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
super.writeTo(out);
out.writeString(jobId);
timeout.writeTo(out);
out.writeBoolean(force);
}
@Override
@ -159,7 +174,8 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(force, other.force);
}
}
@ -223,15 +239,17 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private final ClusterService clusterService;
private final CloseJobService closeJobService;
private final Client client;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, CloseJobService closeJobService) {
ClusterService clusterService, CloseJobService closeJobService, Client client) {
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.closeJobService = closeJobService;
this.client = client;
}
@Override
@ -246,6 +264,19 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
if (request.isForce()) {
forceCloseJob(client, request.getJobId(), state, listener);
} else {
closeJob(request, listener);
}
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private void closeJob(Request request, ActionListener<Response> listener) {
clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -274,12 +305,26 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
private void forceCloseJob(Client client, String jobId, ClusterState currentState,
ActionListener<Response> listener) {
PersistentTask<?> task = MlMetadata.getJobTask(jobId,
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
if (task != null) {
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(task.getId());
client.execute(RemovePersistentTaskAction.INSTANCE, request,
ActionListener.wrap(
response -> listener.onResponse(new Response(response.isAcknowledged())),
listener::onFailure));
} else {
String msg = "Requested job [" + jobId + "] be force-closed, but job's task" +
"could not be found.";
logger.warn(msg);
listener.onFailure(new RuntimeException(msg));
}
}
}
static PersistentTask<?> validateAndFindTask(String jobId, ClusterState state) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (mlMetadata.getJobs().containsKey(jobId) == false) {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -46,6 +47,7 @@ import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import java.io.IOException;
import java.util.List;
@ -57,6 +59,7 @@ public class StopDatafeedAction
public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/stop";
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
private StopDatafeedAction() {
super(NAME);
@ -80,6 +83,7 @@ public class StopDatafeedAction
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
}
public static Request fromXContent(XContentParser parser) {
@ -95,6 +99,7 @@ public class StopDatafeedAction
}
private String datafeedId;
private boolean force = false;
public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());
@ -109,6 +114,14 @@ public class StopDatafeedAction
return datafeedId;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Override
public boolean match(Task task) {
String expectedDescription = "datafeed-" + datafeedId;
@ -124,12 +137,14 @@ public class StopDatafeedAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
force = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
out.writeBoolean(force);
}
@Override
@ -158,7 +173,8 @@ public class StopDatafeedAction
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(getTimeout(), other.getTimeout());
Objects.equals(getTimeout(), other.getTimeout()) &&
Objects.equals(force, other.force);
}
}
@ -204,26 +220,42 @@ public class StopDatafeedAction
public static class TransportAction extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
private final Client client;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService) {
ClusterService clusterService, Client client) {
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME);
this.client = client;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
MetaData metaData = state.metaData();
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = metaData.custom(PersistentTasksCustomMetaData.TYPE);
if (request.force) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
if (datafeedTask != null) {
forceStopTask(client, datafeedTask.getId(), listener);
} else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found.";
logger.warn(msg);
listener.onFailure(new RuntimeException(msg));
}
} else {
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks);
request.setNodes(nodeId);
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure);
super.doExecute(task, request, finalListener);
}
}
// Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed
// This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
@ -239,6 +271,15 @@ public class StopDatafeedAction
});
}
private void forceStopTask(Client client, long taskId, ActionListener<Response> listener) {
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId);
client.execute(RemovePersistentTaskAction.INSTANCE, request,
ActionListener.wrap(
response -> listener.onResponse(new Response(response.isAcknowledged())),
listener::onFailure));
}
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {

View File

@ -46,6 +46,10 @@ public class RestStopDatafeedAction extends BaseRestHandler {
StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout);
}
if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) {
jobDatafeedRequest.setForce(
restRequest.paramAsBoolean(StopDatafeedAction.FORCE.getPreferredName(), false));
}
}
return channel -> client.execute(StopDatafeedAction.INSTANCE, jobDatafeedRequest, new RestBuilderListener<Response>(channel) {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.CloseJobAction.Request;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
@ -28,9 +29,13 @@ public class RestCloseJobAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
CloseJobAction.Request request = new CloseJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
if (restRequest.hasParam("timeout")) {
request.setTimeout(TimeValue.parseTimeValue(restRequest.param("timeout"), "timeout"));
Request request = new Request(restRequest.param(Job.ID.getPreferredName()));
if (restRequest.hasParam(Request.TIMEOUT.getPreferredName())) {
request.setTimeout(TimeValue.parseTimeValue(
restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName()));
}
if (restRequest.hasParam(Request.FORCE.getPreferredName())) {
request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), false));
}
return channel -> client.execute(CloseJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -88,7 +88,14 @@ public class MlRestTestStateCleaner {
|| e.getMessage().contains("expected job state [opened], but got [closing]")) {
logger.debug("job [" + jobId + "] has already been closed", e);
} else {
logger.warn("failed to close job [" + jobId + "]", e);
logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e);
try {
adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true");
throw new RuntimeException("Had to resort to force-closing job, something went wrong?");
} catch (Exception e2) {
throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2);
}
}
}
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode();

View File

@ -253,9 +253,17 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
assertTrue(stopResponse.isStopped());
} catch (ExecutionException e) {
if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e);
} else {
throw new RuntimeException(e);
try {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId);
request.setForce(true);
StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get();
assertTrue(stopResponse.isStopped());
throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?");
} catch (Exception e2) {
throw new RuntimeException("Force-stopping datafeed [" + datafeedId + "] failed.", e2);
}
}
}
assertBusy(() -> {
@ -289,7 +297,18 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|| e.getMessage().contains("expected job state [opened], but got [closing]")) {
logger.debug("job [" + jobId + "] has already been closed", e);
} else {
throw new RuntimeException(e);
try {
CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId);
closeRequest.setForce(true);
closeRequest.setTimeout(TimeValue.timeValueSeconds(30L));
CloseJobAction.Response response =
client.execute(CloseJobAction.INSTANCE, closeRequest).get();
assertTrue(response.isClosed());
throw new RuntimeException("Had to resort to force-closing job, something went wrong?");
} catch (Exception e2) {
throw new RuntimeException("Force-closing datafeed [" + jobId + "] failed.", e2);
}
}
}
assertBusy(() -> {

View File

@ -9,6 +9,13 @@
"type": "string",
"required": true,
"description": "The name of the job to close"
}
},
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "True if the job should be forcefully closed"
},
"timeout": {
"type": "time",

View File

@ -13,6 +13,13 @@
"type": "string",
"required": true,
"description": "The ID of the datafeed to stop"
}
},
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "True if the datafeed should be forcefully stopped."
},
"timeout": {
"type": "time",

View File

@ -295,3 +295,123 @@
catch: /Cannot delete job \[datafeed-job\] while datafeed \[test-datafeed-1\] refers to it/
xpack.ml.delete_job:
job_id: datafeed-job
---
"Test close job":
- do:
xpack.ml.put_job:
job_id: farequote
body: >
{
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":"1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "farequote" }
- do:
xpack.ml.open_job:
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
xpack.ml.flush_job:
job_id: farequote
- match: { flushed: true }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened}
- do:
xpack.ml.close_job:
job_id: farequote
- match: { closed: true }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match:
metadata.persistent_tasks.tasks: []
---
"Test force close job":
- do:
xpack.ml.put_job:
job_id: farequote
body: >
{
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":"1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "farequote" }
- do:
xpack.ml.open_job:
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
xpack.ml.flush_job:
job_id: farequote
- match: { flushed: true }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened}
- do:
xpack.ml.close_job:
job_id: farequote
force: true
- match: { closed: true }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match:
metadata.persistent_tasks.tasks: []

View File

@ -57,6 +57,41 @@ setup:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: stopped }
---
"Test force stop datafeed":
- do:
xpack.ml.open_job:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: started }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks.**.DatafeedState
- match: {metadata.persistent_tasks.tasks.0.status.DatafeedState.state: started}
- do:
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
force: true
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: stopped }
- do:
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks.**.DatafeedState
- is_false: metadata.persistent_tasks.tasks.0.status.DatafeedState
---
"Test start datafeed given start is now":
- do: