From 5c9364d5fe3f51e277f67c16e0a7ba6849d93737 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Sat, 8 Apr 2017 14:44:08 +0100 Subject: [PATCH] [ML] Increase stop datafeed default timeout to 5 minutes (elastic/x-pack-elasticsearch#992) Increase the timeout to give enough time for a datafeed to stop smoothly. This is the second step to avoid hitting the default timeout. The first was ensuring aggregated datafeed is cancellable in a responsive manner. The third and final step will be to apply chunking in aggregated datafeeds in order to shorten the duration of the search, which will make cancellation even more responsive. Relates elastic/x-pack-elasticsearch#803 Original commit: elastic/x-pack-elasticsearch@db642330ec73adf815b4f6d2393fb6415cde0d23 --- .../xpack/ml/action/StartDatafeedAction.java | 2 +- .../xpack/ml/action/StopDatafeedAction.java | 28 +++++++++++++------ .../xpack/ml/datafeed/DatafeedJob.java | 7 ++++- .../xpack/ml/datafeed/DatafeedManager.java | 3 +- .../datafeeds/RestStopDatafeedAction.java | 6 ++-- .../StopDatafeedActionRequestTests.java | 3 +- 6 files changed, 33 insertions(+), 16 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 6dd56941b60..74605d2de26 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -331,7 +331,7 @@ public class StartDatafeedAction } public void stop(String reason) { - stop(reason, TimeValue.timeValueSeconds(20)); + stop(reason, StopDatafeedAction.DEFAULT_TIMEOUT); } public void stop(String reason, TimeValue timeout) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 352a5ea29a2..fd73cf25818 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -54,6 +54,7 @@ public class StopDatafeedAction public static final String NAME = "cluster:admin/xpack/ml/datafeeds/stop"; public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); + public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(5); private StopDatafeedAction() { super(NAME); @@ -76,7 +77,7 @@ public class StopDatafeedAction static { PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareString((request, val) -> - request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + request.setStopTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareBoolean(Request::setForce, FORCE); } @@ -93,12 +94,12 @@ public class StopDatafeedAction } private String datafeedId; + private TimeValue stopTimeout = DEFAULT_TIMEOUT; private boolean force = false; public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); setActions(StartDatafeedAction.NAME); - setTimeout(TimeValue.timeValueSeconds(20)); } Request() { @@ -108,6 +109,14 @@ public class StopDatafeedAction return datafeedId; } + public TimeValue getStopTimeout() { + return stopTimeout; + } + + public void setStopTimeout(TimeValue stopTimeout) { + this.stopTimeout = ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName()); + } + public boolean isForce() { return force; } @@ -131,6 +140,7 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + stopTimeout = new TimeValue(in); force = in.readBoolean(); } @@ -138,21 +148,21 @@ public class StopDatafeedAction public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + stopTimeout.writeTo(out); out.writeBoolean(force); } @Override public int hashCode() { - return Objects.hash(datafeedId, getTimeout()); + return Objects.hash(datafeedId, stopTimeout, force); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId); - if (getTimeout() != null) { - builder.field(TIMEOUT.getPreferredName(), getTimeout().getStringRep()); - } + builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep()); + builder.field(FORCE.getPreferredName(), force); builder.endObject(); return builder; } @@ -167,7 +177,7 @@ public class StopDatafeedAction } Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && - Objects.equals(getTimeout(), other.getTimeout()) && + Objects.equals(stopTimeout, other.stopTimeout) && Objects.equals(force, other.force); } } @@ -254,7 +264,7 @@ public class StopDatafeedAction // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(), + persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(), new WaitForPersistentTaskStatusListener() { @Override public void onResponse(PersistentTask task) { @@ -295,7 +305,7 @@ public class StopDatafeedAction @Override protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener listener) { - task.stop("stop_datafeed (api)", request.getTimeout()); + task.stop("stop_datafeed (api)", request.getStopTimeout()); listener.onResponse(new Response(true)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index da2c028134c..eee3630cec3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -193,7 +193,12 @@ class DatafeedJob { throw new EmptyDataCountException(); } - client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); + // If the datafeed was stopped, then it is possible that by the time + // we call flush the job is closed. Thus, we don't flush unless the + // datafeed is stilll running. + if (isRunning()) { + client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); + } } private DataCounts postData(InputStream inputStream, XContentType xContentType) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 9b541adba8d..0d942bde432 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -319,7 +319,8 @@ public class DatafeedManager extends AbstractComponent { FutureUtils.cancel(future); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), + acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); if (autoCloseJob) { closeJob(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index f3fa71a459b..db4cba8652f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -42,9 +42,9 @@ public class RestStopDatafeedAction extends BaseRestHandler { } else { jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId); if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) { - TimeValue openTimeout = restRequest.paramAsTime( - StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); - jobDatafeedRequest.setTimeout(openTimeout); + TimeValue stopTimeout = restRequest.paramAsTime( + StopDatafeedAction.TIMEOUT.getPreferredName(), StopDatafeedAction.DEFAULT_TIMEOUT); + jobDatafeedRequest.setStopTimeout(stopTimeout); } if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) { jobDatafeedRequest.setForce( diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index cfa18fa14ca..a676fbe0c5d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -32,8 +32,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe protected Request createTestInstance() { Request request = new Request(randomAlphaOfLengthBetween(1, 20)); if (randomBoolean()) { - request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + request.setStopTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + request.setForce(randomBoolean()); return request; }