From bf4da6c6ad0c27c51bf89dc28d177341e671e62f Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 9 Aug 2019 20:30:11 +0200 Subject: [PATCH] [ML-DataFrame] fix starting a batch data frame after stopping at runtime (#45340) (#45381) fix loading of next checkpoint after data frame transform has been stopped/started within one run closes #45339 --- .../transforms/DataFrameTransformState.java | 9 ------ ...FrameTransformPersistentTasksExecutor.java | 29 +++++++++++-------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index f942f0dd2a9..6cc058e5acd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -151,15 +151,6 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState return progress; } - /** - * Get the in-progress checkpoint - * - * @return checkpoint in progress or 0 if task/indexer is not active - */ - public long getInProgressCheckpoint() { - return indexerState.equals(IndexerState.INDEXING) ? checkpoint + 1L : 0; - } - public String getReason() { return reason; } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 6e9fe01a65e..a75c2d4b022 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -146,11 +146,20 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx // <5> load next checkpoint ActionListener getTransformNextCheckpointListener = ActionListener.wrap( nextCheckpoint -> { - indexerBuilder.setNextCheckpoint(nextCheckpoint); + + if (nextCheckpoint.isEmpty()) { + // extra safety: reset position and progress if next checkpoint is empty + // prevents a failure if for some reason the next checkpoint has been deleted + indexerBuilder.setInitialPosition(null); + indexerBuilder.setProgress(null); + } else { + logger.trace("[{}] Loaded next checkpoint [{}] found, starting the task", transformId, + nextCheckpoint.getCheckpoint()); + indexerBuilder.setNextCheckpoint(nextCheckpoint); + } final long lastCheckpoint = stateHolder.get().getCheckpoint(); - logger.trace("[{}] No next checkpoint found, starting the task", transformId); startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); }, error -> { @@ -166,14 +175,10 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx lastCheckpoint -> { indexerBuilder.setLastCheckpoint(lastCheckpoint); - final long nextCheckpoint = stateHolder.get().getInProgressCheckpoint(); - - if (nextCheckpoint > 0) { - transformsConfigManager.getTransformCheckpoint(transformId, nextCheckpoint, getTransformNextCheckpointListener); - } else { - logger.trace("[{}] No next checkpoint found, starting the task", transformId); - startTask(buildTask, indexerBuilder, lastCheckpoint.getCheckpoint(), startTaskListener); - } + logger.trace("[{}] Loaded last checkpoint [{}], looking for next checkpoint", transformId, + lastCheckpoint.getCheckpoint()); + transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint.getCheckpoint() + 1, + getTransformNextCheckpointListener); }, error -> { String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); @@ -201,8 +206,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { - logger.trace("[{}] No checkpoint found, starting the task", transformId); - startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); + logger.trace("[{}] No last checkpoint found, looking for next checkpoint", transformId); + transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint + 1, getTransformNextCheckpointListener); } else { logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint); transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener);