diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 6dd56941b60..74605d2de26 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -331,7 +331,7 @@ public class StartDatafeedAction } public void stop(String reason) { - stop(reason, TimeValue.timeValueSeconds(20)); + stop(reason, StopDatafeedAction.DEFAULT_TIMEOUT); } public void stop(String reason, TimeValue timeout) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 352a5ea29a2..fd73cf25818 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -54,6 +54,7 @@ public class StopDatafeedAction public static final String NAME = "cluster:admin/xpack/ml/datafeeds/stop"; public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); + public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(5); private StopDatafeedAction() { super(NAME); @@ -76,7 +77,7 @@ public class StopDatafeedAction static { PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID); PARSER.declareString((request, val) -> - request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + request.setStopTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareBoolean(Request::setForce, FORCE); } @@ -93,12 +94,12 @@ public class StopDatafeedAction } private String datafeedId; + private TimeValue stopTimeout = DEFAULT_TIMEOUT; private boolean force = false; public Request(String jobId) { this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); setActions(StartDatafeedAction.NAME); - setTimeout(TimeValue.timeValueSeconds(20)); } Request() { @@ -108,6 +109,14 @@ public class StopDatafeedAction return datafeedId; } + public TimeValue getStopTimeout() { + return stopTimeout; + } + + public void setStopTimeout(TimeValue stopTimeout) { + this.stopTimeout = ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName()); + } + public boolean isForce() { return force; } @@ -131,6 +140,7 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + stopTimeout = new TimeValue(in); force = in.readBoolean(); } @@ -138,21 +148,21 @@ public class StopDatafeedAction public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + stopTimeout.writeTo(out); out.writeBoolean(force); } @Override public int hashCode() { - return Objects.hash(datafeedId, getTimeout()); + return Objects.hash(datafeedId, stopTimeout, force); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId); - if (getTimeout() != null) { - builder.field(TIMEOUT.getPreferredName(), getTimeout().getStringRep()); - } + builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep()); + builder.field(FORCE.getPreferredName(), force); builder.endObject(); return builder; } @@ -167,7 +177,7 @@ public class StopDatafeedAction } Request other = (Request) obj; return Objects.equals(datafeedId, other.datafeedId) && - Objects.equals(getTimeout(), other.getTimeout()) && + Objects.equals(stopTimeout, other.stopTimeout) && Objects.equals(force, other.force); } } @@ -254,7 +264,7 @@ public class StopDatafeedAction // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(), + persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(), new WaitForPersistentTaskStatusListener() { @Override public void onResponse(PersistentTask task) { @@ -295,7 +305,7 @@ public class StopDatafeedAction @Override protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener listener) { - task.stop("stop_datafeed (api)", request.getTimeout()); + task.stop("stop_datafeed (api)", request.getStopTimeout()); listener.onResponse(new Response(true)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index da2c028134c..eee3630cec3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -193,7 +193,12 @@ class DatafeedJob { throw new EmptyDataCountException(); } - client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); + // If the datafeed was stopped, then it is possible that by the time + // we call flush the job is closed. Thus, we don't flush unless the + // datafeed is stilll running. + if (isRunning()) { + client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); + } } private DataCounts postData(InputStream inputStream, XContentType xContentType) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 9b541adba8d..0d942bde432 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -319,7 +319,8 @@ public class DatafeedManager extends AbstractComponent { FutureUtils.cancel(future); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), + acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); if (autoCloseJob) { closeJob(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index f3fa71a459b..db4cba8652f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -42,9 +42,9 @@ public class RestStopDatafeedAction extends BaseRestHandler { } else { jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId); if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) { - TimeValue openTimeout = restRequest.paramAsTime( - StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); - jobDatafeedRequest.setTimeout(openTimeout); + TimeValue stopTimeout = restRequest.paramAsTime( + StopDatafeedAction.TIMEOUT.getPreferredName(), StopDatafeedAction.DEFAULT_TIMEOUT); + jobDatafeedRequest.setStopTimeout(stopTimeout); } if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) { jobDatafeedRequest.setForce( diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index cfa18fa14ca..a676fbe0c5d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -32,8 +32,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe protected Request createTestInstance() { Request request = new Request(randomAlphaOfLengthBetween(1, 20)); if (randomBoolean()) { - request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + request.setStopTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + request.setForce(randomBoolean()); return request; }