[ML] Upon task cancel stop datafeed immediately.
Original commit: elastic/x-pack-elasticsearch@0401ca3d33
This commit is contained in:
parent
bd40dd36c9
commit
a3e7c65ba4
|
@ -393,11 +393,11 @@ public class StartDatafeedAction
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void onCancelled() {
|
protected void onCancelled() {
|
||||||
stop(getReasonCancelled());
|
// If the persistent task framework wants us to stop then we should do so immediately and
|
||||||
}
|
// we should wait for an existing datafeed import to realize we want it to stop.
|
||||||
|
// Note that this only applied when task cancel is invoked and stop datafeed api doesn't use this.
|
||||||
public void stop(String reason) {
|
// Also stop datafeed api will obey the timeout.
|
||||||
stop(reason, StopDatafeedAction.DEFAULT_TIMEOUT);
|
stop(getReasonCancelled(), TimeValue.ZERO);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop(String reason, TimeValue timeout) {
|
public void stop(String reason, TimeValue timeout) {
|
||||||
|
|
|
@ -108,14 +108,14 @@ public class DatafeedManager extends AbstractComponent {
|
||||||
}, handler);
|
}, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void stopDatafeed(String datafeedId, String reason, TimeValue timeout) {
|
public void stopDatafeed(String datafeedId, String reason, TimeValue timeout) {
|
||||||
Holder holder = runningDatafeeds.remove(datafeedId);
|
Holder holder = runningDatafeeds.remove(datafeedId);
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
holder.stop(reason, timeout, null);
|
holder.stop(reason, timeout, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void stopAllDatafeeds(String reason) {
|
public void stopAllDatafeeds(String reason) {
|
||||||
int numDatafeeds = runningDatafeeds.size();
|
int numDatafeeds = runningDatafeeds.size();
|
||||||
if (numDatafeeds != 0) {
|
if (numDatafeeds != 0) {
|
||||||
logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason);
|
logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ml.action.PostDataAction;
|
||||||
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
||||||
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask;
|
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask;
|
||||||
import org.elasticsearch.xpack.ml.action.StartDatafeedActionTests;
|
import org.elasticsearch.xpack.ml.action.StartDatafeedActionTests;
|
||||||
|
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
|
@ -341,7 +342,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
||||||
|
|
||||||
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||||
if (cancelled) {
|
if (cancelled) {
|
||||||
task.stop("test");
|
task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT);
|
||||||
verify(handler).accept(null);
|
verify(handler).accept(null);
|
||||||
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
|
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue