From a3e7c65ba4407f9bf59c874d6cd43c6dd8b8b317 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 24 Apr 2017 11:15:07 +0200 Subject: [PATCH] [ML] Upon task cancel stop datafeed immediately. Original commit: elastic/x-pack-elasticsearch@0401ca3d33812b847fd423339e07450008a2d2d0 --- .../xpack/ml/action/StartDatafeedAction.java | 10 +++++----- .../xpack/ml/datafeed/DatafeedManager.java | 4 ++-- .../xpack/ml/datafeed/DatafeedManagerTests.java | 3 ++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 8cc7ad45a75..cf26bfc73d1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -393,11 +393,11 @@ public class StartDatafeedAction @Override protected void onCancelled() { - stop(getReasonCancelled()); - } - - public void stop(String reason) { - stop(reason, StopDatafeedAction.DEFAULT_TIMEOUT); + // If the persistent task framework wants us to stop then we should do so immediately and + // we should wait for an existing datafeed import to realize we want it to stop. + // Note that this only applied when task cancel is invoked and stop datafeed api doesn't use this. + // Also stop datafeed api will obey the timeout. + stop(getReasonCancelled(), TimeValue.ZERO); } public void stop(String reason, TimeValue timeout) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index b209c96be23..dec5fc148e3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -108,14 +108,14 @@ public class DatafeedManager extends AbstractComponent { }, handler); } - public synchronized void stopDatafeed(String datafeedId, String reason, TimeValue timeout) { + public void stopDatafeed(String datafeedId, String reason, TimeValue timeout) { Holder holder = runningDatafeeds.remove(datafeedId); if (holder != null) { holder.stop(reason, timeout, null); } } - public synchronized void stopAllDatafeeds(String reason) { + public void stopAllDatafeeds(String reason) { int numDatafeeds = runningDatafeeds.size(); if (numDatafeeds != 0) { logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 5ea8c34ca3e..8ec5cb83c75 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask; import org.elasticsearch.xpack.ml.action.StartDatafeedActionTests; +import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; @@ -341,7 +342,7 @@ public class DatafeedManagerTests extends ESTestCase { verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { - task.stop("test"); + task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT); verify(handler).accept(null); assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false)); } else {