From 33b4032fab47fe41015316d40f54225f46c5e32c Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 2 May 2019 12:01:59 -0500 Subject: [PATCH] [ML] Correct indexer state on task re-allocation (#41724) (#41751) --- ...FrameTransformPersistentTasksExecutor.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 01999eff64c..d0f15197c3c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -114,7 +114,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx new DataFrameTransformTask.ClientDataFrameIndexerBuilder() .setAuditor(auditor) .setClient(client) - .setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState()) + .setIndexerState(currentIndexerState(transformState)) .setInitialPosition(transformState == null ? null : transformState.getPosition()) // If the state is `null` that means this is a "first run". We can safely assume the // task will attempt to gather the initial progress information @@ -184,6 +184,26 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener); } + private static IndexerState currentIndexerState(DataFrameTransformState previousState) { + if (previousState == null) { + return IndexerState.STOPPED; + } + switch(previousState.getIndexerState()){ + // If it is STARTED or INDEXING we want to make sure we revert to started + // Otherwise, the internal indexer will never get scheduled and execute + case STARTED: + case INDEXING: + return IndexerState.STARTED; + // If we are STOPPED, STOPPING, or ABORTING and just started executing on this node, + // then it is safe to say we should be STOPPED + case STOPPED: + case STOPPING: + case ABORTING: + default: + return IndexerState.STOPPED; + } + } + private void markAsFailed(DataFrameTransformTask task, String reason) { CountDownLatch latch = new CountDownLatch(1);